This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/linkis.git


The following commit(s) were added to refs/heads/master by this push:
     new 14e118afc Fix flink sql kill yarn application and getJobStatus fail 
(#5041)
14e118afc is described below

commit 14e118afc987d73d15160cd9d17501acff1aceda
Author: ChengJie1053 <[email protected]>
AuthorDate: Thu Dec 14 11:41:34 2023 +0800

    Fix flink sql kill yarn application and getJobStatus fail (#5041)
    
    * Fix flink getJobStatus bug
    
    * Fix flink sql kill yarn application and getJobStatus fail
---
 .../computation/executor/service/TaskExecutionServiceImpl.scala  | 4 ++--
 .../flink/client/deployment/ClusterDescriptorAdapter.java        | 9 +++++++++
 .../flink/executor/FlinkSQLComputationExecutor.scala             | 5 +++++
 linkis-engineconn-plugins/repl/src/main/resources/log4j2.xml     | 2 +-
 4 files changed, 17 insertions(+), 3 deletions(-)

diff --git 
a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/TaskExecutionServiceImpl.scala
 
b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/TaskExecutionServiceImpl.scala
index 23d6ff258..6b4fc64fe 100644
--- 
a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/TaskExecutionServiceImpl.scala
+++ 
b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/TaskExecutionServiceImpl.scala
@@ -631,7 +631,7 @@ class TaskExecutionServiceImpl
         if (null != task) {
           sendToEntrance(task, ResponseTaskLog(logUpdateEvent.taskId, 
logUpdateEvent.log))
         } else {
-          logger.error("Task cannot null! logupdateEvent: " + 
logUpdateEvent.taskId)
+          logger.warn("Task cannot null! logupdateEvent: " + 
logUpdateEvent.taskId)
         }
       } else if (null != lastTask) {
         val executor = executorManager.getReportExecutor
@@ -733,7 +733,7 @@ class TaskExecutionServiceImpl
     if (null != executor) {
       executor.getTaskById(taskId)
     } else {
-      logger.error(s"Executor of taskId : $taskId is not cached.")
+      logger.warn(s"Executor of taskId : $taskId is not cached.")
       null
     }
   }
diff --git 
a/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/deployment/ClusterDescriptorAdapter.java
 
b/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/deployment/ClusterDescriptorAdapter.java
index 6930a43f0..a5ac10203 100644
--- 
a/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/deployment/ClusterDescriptorAdapter.java
+++ 
b/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/deployment/ClusterDescriptorAdapter.java
@@ -91,6 +91,15 @@ public abstract class ClusterDescriptorAdapter implements 
Closeable {
 
   /** Returns the status of the flink job. */
   public JobStatus getJobStatus() throws JobExecutionException {
+    if (jobId == null) {
+      try {
+        LOG.info("flink getJobStatus jobId is null,sleep three seconds");
+        Thread.sleep(3000);
+      } catch (InterruptedException e) {
+
+      }
+    }
+    LOG.info("flink getJobStatus jobId:{}", jobId);
     if (jobId == null) {
       throw new JobExecutionException(NO_JOB_SUBMITTED.getErrorDesc());
     }
diff --git 
a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkSQLComputationExecutor.scala
 
b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkSQLComputationExecutor.scala
index 2e3a2bd07..60f1d9088 100644
--- 
a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkSQLComputationExecutor.scala
+++ 
b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkSQLComputationExecutor.scala
@@ -236,6 +236,11 @@ class FlinkSQLComputationExecutor(
     super.close()
   }
 
+  override def tryShutdown(): Boolean = {
+    Utils.tryAndWarn(close())
+    super.tryShutdown()
+  }
+
 }
 
 class FlinkSQLStatusListener(
diff --git a/linkis-engineconn-plugins/repl/src/main/resources/log4j2.xml 
b/linkis-engineconn-plugins/repl/src/main/resources/log4j2.xml
index 3e790b6da..c75bf80fb 100644
--- a/linkis-engineconn-plugins/repl/src/main/resources/log4j2.xml
+++ b/linkis-engineconn-plugins/repl/src/main/resources/log4j2.xml
@@ -47,7 +47,7 @@
     <loggers>
       <root level="INFO">
             <appender-ref ref="stderr"/>
-            <appender-ref ref="Console"/>
+            <appender-ref ref="RollingFile"/>
             <appender-ref ref="Send"/>
         </root>
         <logger name="org.apache.hadoop.hive.ql.exec.StatsTask" level="info" 
additivity="true">


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to