This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/linkis.git
The following commit(s) were added to refs/heads/master by this push:
new 14e118afc Fix flink sql kill yarn application and getJobStatus fail
(#5041)
14e118afc is described below
commit 14e118afc987d73d15160cd9d17501acff1aceda
Author: ChengJie1053 <[email protected]>
AuthorDate: Thu Dec 14 11:41:34 2023 +0800
Fix flink sql kill yarn application and getJobStatus fail (#5041)
* Fix flink getJobStatus bug
* Fix flink sql kill yarn application and getJobStatus fail
---
.../computation/executor/service/TaskExecutionServiceImpl.scala | 4 ++--
.../flink/client/deployment/ClusterDescriptorAdapter.java | 9 +++++++++
.../flink/executor/FlinkSQLComputationExecutor.scala | 5 +++++
linkis-engineconn-plugins/repl/src/main/resources/log4j2.xml | 2 +-
4 files changed, 17 insertions(+), 3 deletions(-)
diff --git
a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/TaskExecutionServiceImpl.scala
b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/TaskExecutionServiceImpl.scala
index 23d6ff258..6b4fc64fe 100644
---
a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/TaskExecutionServiceImpl.scala
+++
b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/TaskExecutionServiceImpl.scala
@@ -631,7 +631,7 @@ class TaskExecutionServiceImpl
if (null != task) {
sendToEntrance(task, ResponseTaskLog(logUpdateEvent.taskId,
logUpdateEvent.log))
} else {
- logger.error("Task cannot null! logupdateEvent: " +
logUpdateEvent.taskId)
+ logger.warn("Task cannot null! logupdateEvent: " +
logUpdateEvent.taskId)
}
} else if (null != lastTask) {
val executor = executorManager.getReportExecutor
@@ -733,7 +733,7 @@ class TaskExecutionServiceImpl
if (null != executor) {
executor.getTaskById(taskId)
} else {
- logger.error(s"Executor of taskId : $taskId is not cached.")
+ logger.warn(s"Executor of taskId : $taskId is not cached.")
null
}
}
diff --git
a/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/deployment/ClusterDescriptorAdapter.java
b/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/deployment/ClusterDescriptorAdapter.java
index 6930a43f0..a5ac10203 100644
---
a/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/deployment/ClusterDescriptorAdapter.java
+++
b/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/deployment/ClusterDescriptorAdapter.java
@@ -91,6 +91,15 @@ public abstract class ClusterDescriptorAdapter implements
Closeable {
/** Returns the status of the flink job. */
public JobStatus getJobStatus() throws JobExecutionException {
+ if (jobId == null) {
+ try {
+ LOG.info("flink getJobStatus jobId is null,sleep three seconds");
+ Thread.sleep(3000);
+ } catch (InterruptedException e) {
+
+ }
+ }
+ LOG.info("flink getJobStatus jobId:{}", jobId);
if (jobId == null) {
throw new JobExecutionException(NO_JOB_SUBMITTED.getErrorDesc());
}
diff --git
a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkSQLComputationExecutor.scala
b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkSQLComputationExecutor.scala
index 2e3a2bd07..60f1d9088 100644
---
a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkSQLComputationExecutor.scala
+++
b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkSQLComputationExecutor.scala
@@ -236,6 +236,11 @@ class FlinkSQLComputationExecutor(
super.close()
}
+ override def tryShutdown(): Boolean = {
+ Utils.tryAndWarn(close())
+ super.tryShutdown()
+ }
+
}
class FlinkSQLStatusListener(
diff --git a/linkis-engineconn-plugins/repl/src/main/resources/log4j2.xml
b/linkis-engineconn-plugins/repl/src/main/resources/log4j2.xml
index 3e790b6da..c75bf80fb 100644
--- a/linkis-engineconn-plugins/repl/src/main/resources/log4j2.xml
+++ b/linkis-engineconn-plugins/repl/src/main/resources/log4j2.xml
@@ -47,7 +47,7 @@
<loggers>
<root level="INFO">
<appender-ref ref="stderr"/>
- <appender-ref ref="Console"/>
+ <appender-ref ref="RollingFile"/>
<appender-ref ref="Send"/>
</root>
<logger name="org.apache.hadoop.hive.ql.exec.StatsTask" level="info"
additivity="true">
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]