This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/linkis.git
The following commit(s) were added to refs/heads/master by this push:
new 71e699421 [bugfix] when task status is complete, ignore engine quited
unexpectedly (#4758)
71e699421 is described below
commit 71e6994215718aa2c6adf38a98ad6224f99fed15
Author: 人生有如两个橘子 <[email protected]>
AuthorDate: Mon Jul 10 20:42:42 2023 +0800
[bugfix] when task status is complete, ignore engine quited unexpectedly
(#4758)
this close #4757
---
.../computation/monitor/EngineConnMonitor.scala | 16 ++++---------
.../orchestrator/listener/task/TaskInfoEvent.scala | 10 ++++++++
.../strategy/async/AsyncTaskManager.scala | 27 ++++++++++++++++++++++
3 files changed, 41 insertions(+), 12 deletions(-)
diff --git
a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/monitor/EngineConnMonitor.scala
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/monitor/EngineConnMonitor.scala
index 5afb75b4a..fb23e8de1 100644
---
a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/monitor/EngineConnMonitor.scala
+++
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/monitor/EngineConnMonitor.scala
@@ -32,6 +32,7 @@ import
org.apache.linkis.manager.common.protocol.node.{RequestNodeStatus, Respon
import
org.apache.linkis.orchestrator.computation.conf.ComputationOrchestratorConf
import
org.apache.linkis.orchestrator.computation.execute.{CodeExecTaskExecutor,
EngineConnTaskInfo}
import org.apache.linkis.orchestrator.listener.task.{
+ EngineQuitedUnexpectedlyEvent,
TaskErrorResponseEvent,
TaskLogEvent,
TaskStatusEvent
@@ -202,20 +203,11 @@ object EngineConnMonitor extends Logging {
logger.warn(
s"Will kill task ${execTask.getIDInfo()} because the engine
${executor.getEngineConnExecutor.getServiceInstance.toString} quited
unexpectedly."
)
- val errLog = LogUtils.generateERROR(
- s"Your job : ${execTask.getIDInfo()} was failed because the engine
quitted unexpectedly(任务${execTask
- .getIDInfo()}失败," +
- s"原因是引擎意外退出,可能是复杂任务导致引擎退出,如OOM)."
- )
- val logEvent = TaskLogEvent(execTask, errLog)
- execTask.getPhysicalContext.pushLog(logEvent)
- val errorResponseEvent = TaskErrorResponseEvent(
+ val event = EngineQuitedUnexpectedlyEvent(
execTask,
- "task failed,Engine quitted
unexpectedly(任务运行失败原因是引擎意外退出,可能是复杂任务导致引擎退出,如OOM)."
+ executor.getEngineConnExecutor.getServiceInstance.toString
)
- execTask.getPhysicalContext.broadcastSyncEvent(errorResponseEvent)
- val statusEvent = TaskStatusEvent(execTask, ExecutionNodeStatus.Failed)
- execTask.getPhysicalContext.broadcastSyncEvent(statusEvent)
+ execTask.getPhysicalContext.broadcastSyncEvent(event)
}
}
}
diff --git
a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/listener/task/TaskInfoEvent.scala
b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/listener/task/TaskInfoEvent.scala
index 87d1731d5..a6ef5834e 100644
---
a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/listener/task/TaskInfoEvent.scala
+++
b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/listener/task/TaskInfoEvent.scala
@@ -112,3 +112,13 @@ case class TaskYarnResourceEvent(
resourceMap: util.HashMap[String, ResourceWithStatus]
) extends TaskInfoEvent
with OrchestratorAsyncEvent
+
+case class EngineQuitedUnexpectedlyEvent(execTask: ExecTask, serviceInstance:
String)
+ extends TaskInfoEvent
+ with OrchestratorSyncEvent {
+
+ override def toString: String = {
+ s"task ${execTask.getIDInfo()}, serviceInstance $serviceInstance"
+ }
+
+}
diff --git
a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/strategy/async/AsyncTaskManager.scala
b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/strategy/async/AsyncTaskManager.scala
index 934bf1c75..7392b3784 100644
---
a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/strategy/async/AsyncTaskManager.scala
+++
b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/strategy/async/AsyncTaskManager.scala
@@ -17,6 +17,7 @@
package org.apache.linkis.orchestrator.strategy.async
+import org.apache.linkis.common.log.LogUtils
import org.apache.linkis.governance.common.entity.ExecutionNodeStatus
import org.apache.linkis.orchestrator.execution.ExecTaskRunner
import org.apache.linkis.orchestrator.execution.impl.DefaultTaskManager
@@ -46,6 +47,8 @@ class AsyncTaskManager
onTaskErrorResponseEvent(taskErrorResponseEvent)
case ExecTaskRunnerCompletedEvent(execTaskRunner) =>
addCompletedTask(execTaskRunner)
+ case event: EngineQuitedUnexpectedlyEvent =>
+ onEngineQuitedUnexpectedly(event)
case _ =>
}
}
@@ -109,4 +112,28 @@ class AsyncTaskManager
new AsyncExecTaskRunnerImpl(execTask)
}
+ def onEngineQuitedUnexpectedly(event: EngineQuitedUnexpectedlyEvent): Unit =
{
+ logger.info(s"received EngineUnexpectedlyQuitedEvent $event")
+ findDealEventTaskRunner(event).foreach {
+ case asyncExecTaskRunner: AsyncExecTaskRunner =>
+ if (asyncExecTaskRunner.isCompleted) {
+ logger.warn(
+ s"task ${event.execTask.getIDInfo()} already complete, ignore
engine ${event.serviceInstance} quited unexpectedly"
+ )
+ } else {
+ val execTask = event.execTask
+ val errLog = LogUtils.generateERROR(
+ s"Your job : ${execTask.getIDInfo()} was failed because the engine
quited unexpectedly(任务${execTask.getIDInfo()}失败,原因是引擎意外退出,可能是复杂任务导致引擎退出,如OOM)."
+ )
+ val logEvent = TaskLogEvent(execTask, errLog)
+ execTask.getPhysicalContext.pushLog(logEvent)
+ val errorMsg =
+ s"task ${execTask.getIDInfo()} failed,Engine
${event.serviceInstance} quited
unexpectedly(任务运行失败原因是引擎意外退出,可能是复杂任务导致引擎退出,如OOM)";
+ asyncExecTaskRunner.markFailed(errorMsg, null)
+ asyncExecTaskRunner.transientStatus(ExecutionNodeStatus.Failed)
+ }
+ case _ =>
+ }
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]