This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/linkis.git


The following commit(s) were added to refs/heads/master by this push:
     new 71e699421 [bugfix] when task status is complete, ignore engine quited 
unexpectedly (#4758)
71e699421 is described below

commit 71e6994215718aa2c6adf38a98ad6224f99fed15
Author: 人生有如两个橘子 <[email protected]>
AuthorDate: Mon Jul 10 20:42:42 2023 +0800

    [bugfix] when task status is complete, ignore engine quited unexpectedly 
(#4758)
    
    this close #4757
---
 .../computation/monitor/EngineConnMonitor.scala    | 16 ++++---------
 .../orchestrator/listener/task/TaskInfoEvent.scala | 10 ++++++++
 .../strategy/async/AsyncTaskManager.scala          | 27 ++++++++++++++++++++++
 3 files changed, 41 insertions(+), 12 deletions(-)

diff --git 
a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/monitor/EngineConnMonitor.scala
 
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/monitor/EngineConnMonitor.scala
index 5afb75b4a..fb23e8de1 100644
--- 
a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/monitor/EngineConnMonitor.scala
+++ 
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/monitor/EngineConnMonitor.scala
@@ -32,6 +32,7 @@ import 
org.apache.linkis.manager.common.protocol.node.{RequestNodeStatus, Respon
 import 
org.apache.linkis.orchestrator.computation.conf.ComputationOrchestratorConf
 import 
org.apache.linkis.orchestrator.computation.execute.{CodeExecTaskExecutor, 
EngineConnTaskInfo}
 import org.apache.linkis.orchestrator.listener.task.{
+  EngineQuitedUnexpectedlyEvent,
   TaskErrorResponseEvent,
   TaskLogEvent,
   TaskStatusEvent
@@ -202,20 +203,11 @@ object EngineConnMonitor extends Logging {
         logger.warn(
           s"Will kill task ${execTask.getIDInfo()} because the engine 
${executor.getEngineConnExecutor.getServiceInstance.toString} quited 
unexpectedly."
         )
-        val errLog = LogUtils.generateERROR(
-          s"Your job : ${execTask.getIDInfo()} was failed because the engine 
quitted unexpectedly(任务${execTask
-            .getIDInfo()}失败," +
-            s"原因是引擎意外退出,可能是复杂任务导致引擎退出,如OOM)."
-        )
-        val logEvent = TaskLogEvent(execTask, errLog)
-        execTask.getPhysicalContext.pushLog(logEvent)
-        val errorResponseEvent = TaskErrorResponseEvent(
+        val event = EngineQuitedUnexpectedlyEvent(
           execTask,
-          "task failed,Engine quitted 
unexpectedly(任务运行失败原因是引擎意外退出,可能是复杂任务导致引擎退出,如OOM)."
+          executor.getEngineConnExecutor.getServiceInstance.toString
         )
-        execTask.getPhysicalContext.broadcastSyncEvent(errorResponseEvent)
-        val statusEvent = TaskStatusEvent(execTask, ExecutionNodeStatus.Failed)
-        execTask.getPhysicalContext.broadcastSyncEvent(statusEvent)
+        execTask.getPhysicalContext.broadcastSyncEvent(event)
       }
     }
   }
diff --git 
a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/listener/task/TaskInfoEvent.scala
 
b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/listener/task/TaskInfoEvent.scala
index 87d1731d5..a6ef5834e 100644
--- 
a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/listener/task/TaskInfoEvent.scala
+++ 
b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/listener/task/TaskInfoEvent.scala
@@ -112,3 +112,13 @@ case class TaskYarnResourceEvent(
     resourceMap: util.HashMap[String, ResourceWithStatus]
 ) extends TaskInfoEvent
     with OrchestratorAsyncEvent
+
+case class EngineQuitedUnexpectedlyEvent(execTask: ExecTask, serviceInstance: 
String)
+    extends TaskInfoEvent
+    with OrchestratorSyncEvent {
+
+  override def toString: String = {
+    s"task ${execTask.getIDInfo()}, serviceInstance $serviceInstance"
+  }
+
+}
diff --git 
a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/strategy/async/AsyncTaskManager.scala
 
b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/strategy/async/AsyncTaskManager.scala
index 934bf1c75..7392b3784 100644
--- 
a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/strategy/async/AsyncTaskManager.scala
+++ 
b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/strategy/async/AsyncTaskManager.scala
@@ -17,6 +17,7 @@
 
 package org.apache.linkis.orchestrator.strategy.async
 
+import org.apache.linkis.common.log.LogUtils
 import org.apache.linkis.governance.common.entity.ExecutionNodeStatus
 import org.apache.linkis.orchestrator.execution.ExecTaskRunner
 import org.apache.linkis.orchestrator.execution.impl.DefaultTaskManager
@@ -46,6 +47,8 @@ class AsyncTaskManager
         onTaskErrorResponseEvent(taskErrorResponseEvent)
       case ExecTaskRunnerCompletedEvent(execTaskRunner) =>
         addCompletedTask(execTaskRunner)
+      case event: EngineQuitedUnexpectedlyEvent =>
+        onEngineQuitedUnexpectedly(event)
       case _ =>
     }
   }
@@ -109,4 +112,28 @@ class AsyncTaskManager
     new AsyncExecTaskRunnerImpl(execTask)
   }
 
+  def onEngineQuitedUnexpectedly(event: EngineQuitedUnexpectedlyEvent): Unit = 
{
+    logger.info(s"received EngineUnexpectedlyQuitedEvent $event")
+    findDealEventTaskRunner(event).foreach {
+      case asyncExecTaskRunner: AsyncExecTaskRunner =>
+        if (asyncExecTaskRunner.isCompleted) {
+          logger.warn(
+            s"task ${event.execTask.getIDInfo()} already complete, ignore 
engine ${event.serviceInstance} quited unexpectedly"
+          )
+        } else {
+          val execTask = event.execTask
+          val errLog = LogUtils.generateERROR(
+            s"Your job : ${execTask.getIDInfo()} was failed because the engine 
quited unexpectedly(任务${execTask.getIDInfo()}失败,原因是引擎意外退出,可能是复杂任务导致引擎退出,如OOM)."
+          )
+          val logEvent = TaskLogEvent(execTask, errLog)
+          execTask.getPhysicalContext.pushLog(logEvent)
+          val errorMsg =
+            s"task ${execTask.getIDInfo()} failed,Engine 
${event.serviceInstance} quited 
unexpectedly(任务运行失败原因是引擎意外退出,可能是复杂任务导致引擎退出,如OOM)";
+          asyncExecTaskRunner.markFailed(errorMsg, null)
+          asyncExecTaskRunner.transientStatus(ExecutionNodeStatus.Failed)
+        }
+      case _ =>
+    }
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to