This is an automated email from the ASF dual-hosted git repository.

casion pushed a commit to branch dev-1.3.2
in repository https://gitbox.apache.org/repos/asf/linkis.git


The following commit(s) were added to refs/heads/dev-1.3.2 by this push:
     new e36a2bcb0 Fix DefaultCodeExecTaskExecutorManager.instanceToExecutors 
concurrency issues (#4118)
e36a2bcb0 is described below

commit e36a2bcb037cf69dd4efe18de57c53f073a16c8e
Author: peacewong <[email protected]>
AuthorDate: Tue Jan 17 22:30:47 2023 +0800

    Fix DefaultCodeExecTaskExecutorManager.instanceToExecutors concurrency 
issues (#4118)
    
    * Fix concurrency issues #4078
    
    
    * optimize concurrentExecutor idle exit
---
 .../linkis-computation-orchestrator.md             |   1 +
 .../concurrent/monitor/TaskMonitorService.java     |   3 -
 .../async/AsyncConcurrentComputationExecutor.scala |   4 +
 .../execute/ConcurrentComputationExecutor.scala    |  35 +++++
 .../execution/AccessibleEngineConnExecution.scala  |  22 ++-
 .../engineconn/executor/entity/Executor.scala      |   2 +
 .../parser/DefaultCodeJobParserTransform.scala     |   4 +-
 .../conf/ComputationOrchestratorConf.scala         |   5 +-
 .../execute/CodeExecTaskExecutorManager.scala      |   4 +-
 .../DefaultCodeExecTaskExecutorManager.scala       |  73 +++++-----
 .../computation/execute/EngineConnTaskInfo.scala   |  37 +----
 .../computation/monitor/EngineConnMonitor.scala    | 160 +++++++++++++--------
 .../physical/CodeLogicalUnitExecTask.scala         |   2 +-
 .../service/ComputationTaskExecutionReceiver.scala |  33 +----
 14 files changed, 205 insertions(+), 180 deletions(-)

diff --git a/docs/configuration/linkis-computation-orchestrator.md 
b/docs/configuration/linkis-computation-orchestrator.md
index aaa6893a2..1b42bc227 100644
--- a/docs/configuration/linkis-computation-orchestrator.md
+++ b/docs/configuration/linkis-computation-orchestrator.md
@@ -17,5 +17,6 @@
 
|linkis-computation-orchestrator|wds.linkis.orchestrator.engine.lastupdate.timeout|5s|
 orchestrator.engine.lastupdate.timeout |
 |linkis-computation-orchestrator|wds.linkis.orchestrator.engine.timeout| 10s| 
orchestrator.engine.timeout|
 
|linkis-computation-orchestrator|wds.linkis.orchestrator.engine.activity_monitor.interval|10s|
 orchestrator.engine.activity_monitor.interval||
+|linkis-computation-orchestrator|wds.linkis.shell.white.usage|cd,ls| 
orchestrator.engine.activity_monitor.interval|The shell whitelist configuration 
has expired|
  
 
diff --git 
a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/java/org/apache/linkis/engineconn/computation/concurrent/monitor/TaskMonitorService.java
 
b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/java/org/apache/linkis/engineconn/computation/concurrent/monitor/TaskMonitorService.java
index 591a9792f..3f081314f 100644
--- 
a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/java/org/apache/linkis/engineconn/computation/concurrent/monitor/TaskMonitorService.java
+++ 
b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/java/org/apache/linkis/engineconn/computation/concurrent/monitor/TaskMonitorService.java
@@ -22,12 +22,9 @@ import org.apache.linkis.engineconn.core.EngineConnObject;
 import org.apache.linkis.engineconn.core.executor.ExecutorManager$;
 import org.apache.linkis.engineconn.executor.entity.Executor;
 
-import org.springframework.stereotype.Component;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-@Component
 public class TaskMonitorService implements MonitorService {
 
   private static Logger LOG = 
LoggerFactory.getLogger(HardwareMonitorService.class);
diff --git 
a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/async/AsyncConcurrentComputationExecutor.scala
 
b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/async/AsyncConcurrentComputationExecutor.scala
index 870b9ea65..cf8b9c00b 100644
--- 
a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/async/AsyncConcurrentComputationExecutor.scala
+++ 
b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/async/AsyncConcurrentComputationExecutor.scala
@@ -255,4 +255,8 @@ abstract class AsyncConcurrentComputationExecutor(override 
val outputPrintLimit:
     }
   }
 
+  override def hasTaskRunning(): Boolean = {
+    getRunningTask > 0
+  }
+
 }
diff --git 
a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ConcurrentComputationExecutor.scala
 
b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ConcurrentComputationExecutor.scala
index 0699ae779..a9447109d 100644
--- 
a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ConcurrentComputationExecutor.scala
+++ 
b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ConcurrentComputationExecutor.scala
@@ -26,6 +26,37 @@ abstract class ConcurrentComputationExecutor(override val 
outputPrintLimit: Int
     extends ComputationExecutor(outputPrintLimit)
     with ConcurrentExecutor {
 
+  private val EXECUTOR_STATUS_LOCKER = new Object
+
+  override def execute(engineConnTask: EngineConnTask): ExecuteResponse = {
+    if (isBusy) {
+      logger.error(
+        s"Executor is busy but still got new task ! Running task num : 
${getRunningTask}"
+      )
+    }
+    if (getRunningTask >= getConcurrentLimit) 
EXECUTOR_STATUS_LOCKER.synchronized {
+      if (getRunningTask >= getConcurrentLimit && 
NodeStatus.isIdle(getStatus)) {
+        logger.info(
+          s"running task: $getRunningTask > concurrent limit: 
$getConcurrentLimit, now to mark engine to busy"
+        )
+        transition(NodeStatus.Busy)
+      }
+    }
+    logger.info(s"engineConnTask(${engineConnTask.getTaskId}) running task is 
($getRunningTask) ")
+    val response = super.execute(engineConnTask)
+    if (getStatus == NodeStatus.Busy && getConcurrentLimit > getRunningTask) {
+      EXECUTOR_STATUS_LOCKER.synchronized {
+        if (getStatus == NodeStatus.Busy && getConcurrentLimit > 
getRunningTask) {
+          logger.info(
+            s"running task($getRunningTask) < concurrent 
limit:$getConcurrentLimit, now to mark engine to Unlock "
+          )
+          transition(NodeStatus.Unlock)
+        }
+      }
+    }
+    response
+  }
+
   protected override def ensureOp[A](f: => A): A = f
 
   override def afterExecute(
@@ -33,4 +64,8 @@ abstract class ConcurrentComputationExecutor(override val 
outputPrintLimit: Int
       executeResponse: ExecuteResponse
   ): Unit = {}
 
+  override def hasTaskRunning(): Boolean = {
+    getRunningTask > 0
+  }
+
 }
diff --git 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/execution/AccessibleEngineConnExecution.scala
 
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/execution/AccessibleEngineConnExecution.scala
index 421968ecc..1b5713e56 100644
--- 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/execution/AccessibleEngineConnExecution.scala
+++ 
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/execution/AccessibleEngineConnExecution.scala
@@ -26,7 +26,12 @@ import 
org.apache.linkis.engineconn.common.execution.EngineConnExecution
 import org.apache.linkis.engineconn.core.EngineConnObject
 import org.apache.linkis.engineconn.core.executor.ExecutorManager
 import org.apache.linkis.engineconn.core.hook.ShutdownHook
-import org.apache.linkis.engineconn.executor.entity.{Executor, LabelExecutor, 
ResourceExecutor}
+import org.apache.linkis.engineconn.executor.entity.{
+  ConcurrentExecutor,
+  Executor,
+  LabelExecutor,
+  ResourceExecutor
+}
 import 
org.apache.linkis.engineconn.executor.listener.ExecutorListenerBusContext
 import org.apache.linkis.engineconn.executor.service.ManagerService
 import org.apache.linkis.manager.common.entity.enumeration.NodeStatus
@@ -111,7 +116,10 @@ class AccessibleEngineConnExecution extends 
EngineConnExecution with Logging {
               ) || NodeStatus.Idle.equals(accessibleExecutor.getStatus))
               && System.currentTimeMillis - 
accessibleExecutor.getLastActivityTime > maxFreeTime
           ) {
-            if (ifECCanMaintain()) {
+            if (isConcurrentExecutorHasTaskRunning(accessibleExecutor)) {
+              logger.info("ConcurrentExecutor has task running ec will not be 
killed at this time")
+              accessibleExecutor.updateLastActivityTime()
+            } else if (isECCanMaintain()) {
               logger.info("ec will not be killed at this time")
               accessibleExecutor.updateLastActivityTime()
             } else {
@@ -159,7 +167,15 @@ class AccessibleEngineConnExecution extends 
EngineConnExecution with Logging {
     }
   }
 
-  private def ifECCanMaintain(): Boolean = {
+  private def isConcurrentExecutorHasTaskRunning(executor: Executor): Boolean 
= {
+    executor match {
+      case concurrentExecutor: ConcurrentExecutor =>
+        concurrentExecutor.hasTaskRunning()
+      case _ => false
+    }
+  }
+
+  private def isECCanMaintain(): Boolean = {
     if (!isMaintainSupported()) return false
     val engineTypeLabel =
       
LabelUtil.getEngineTypeLabel(EngineConnObject.getEngineCreationContext.getLabels())
diff --git 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/executor-core/src/main/scala/org/apache/linkis/engineconn/executor/entity/Executor.scala
 
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/executor-core/src/main/scala/org/apache/linkis/engineconn/executor/entity/Executor.scala
index f448713f9..85189d400 100644
--- 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/executor-core/src/main/scala/org/apache/linkis/engineconn/executor/entity/Executor.scala
+++ 
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/executor-core/src/main/scala/org/apache/linkis/engineconn/executor/entity/Executor.scala
@@ -50,4 +50,6 @@ trait ConcurrentExecutor extends Executor {
 
   def killAll(): Unit
 
+  def hasTaskRunning(): Boolean
+
 }
diff --git 
a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/catalyst/parser/DefaultCodeJobParserTransform.scala
 
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/catalyst/parser/DefaultCodeJobParserTransform.scala
index 7ffdcbfdb..535e09fb2 100644
--- 
a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/catalyst/parser/DefaultCodeJobParserTransform.scala
+++ 
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/catalyst/parser/DefaultCodeJobParserTransform.scala
@@ -24,7 +24,7 @@ import org.apache.linkis.orchestrator.parser.Parser
 import org.apache.linkis.orchestrator.plans.ast.{ASTContext, Job}
 import org.apache.linkis.orchestrator.plans.unit.CodeLogicalUnit
 
-import scala.collection.JavaConverters._
+import scala.collection.JavaConversions._
 import scala.collection.mutable.ArrayBuffer
 
 import com.google.common.collect.Lists
@@ -63,7 +63,7 @@ class DefaultCodeJobParserTransform extends ParserTransform 
with Logging {
 
   def splitCode(codeJob: CodeJob): Array[CodeLogicalUnit] = {
     val codeLogicalUnits = new ArrayBuffer[CodeLogicalUnit]
-    codeJob.getCodeLogicalUnit.getCodes.asScala.foreach { code =>
+    codeJob.getCodeLogicalUnit.getCodes.foreach { code =>
       code.split(codeJob.getCodeLogicalUnit.getSeparator).foreach { line =>
         codeLogicalUnits.append(
           new CodeLogicalUnit(
diff --git 
a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/conf/ComputationOrchestratorConf.scala
 
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/conf/ComputationOrchestratorConf.scala
index e813a2190..6cc1c7e48 100644
--- 
a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/conf/ComputationOrchestratorConf.scala
+++ 
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/conf/ComputationOrchestratorConf.scala
@@ -40,10 +40,7 @@ object ComputationOrchestratorConf {
     
"rm,sh,find,kill,python,for,source,hdfs,hadoop,spark-sql,spark-submit,pyspark,spark-shell,hive,yarn"
   )
 
-  val SHELL_WHITE_USAGE = CommonVars(
-    "wds.linkis.shell.white.usage",
-    
"sqoop,cd,ll,ls,echo,cat,tree,diff,who,grep,whoami,set,pwd,cut,file,head,less,if,while"
-  )
+  val SHELL_WHITE_USAGE = CommonVars("wds.linkis.shell.white.usage", "cd,ls")
 
   val SHELL_WHITE_USAGE_ENABLED = 
CommonVars("wds.linkis.shell.white.usage.enabled", false)
 
diff --git 
a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/execute/CodeExecTaskExecutorManager.scala
 
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/execute/CodeExecTaskExecutorManager.scala
index 9bfab20de..428db5f28 100644
--- 
a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/execute/CodeExecTaskExecutorManager.scala
+++ 
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/execute/CodeExecTaskExecutorManager.scala
@@ -37,7 +37,7 @@ trait CodeExecTaskExecutorManager {
 
   def askExecutor(execTask: CodeLogicalUnitExecTask, wait: Duration): 
Option[CodeExecTaskExecutor]
 
-  def addEngineConnTaskID(executor: CodeExecTaskExecutor): Unit
+  def addEngineConnTaskInfo(executor: CodeExecTaskExecutor): Unit
 
   def getByEngineConnAndTaskId(
       serviceInstance: ServiceInstance,
@@ -93,7 +93,7 @@ trait CodeExecTaskExecutorManager {
       isSucceed: Boolean
   ): Unit
 
-  def getAllInstanceToExecutorCache(): java.util.Map[ServiceInstance, 
Array[CodeExecTaskExecutor]]
+  def getAllInstanceToExecutorCache(): java.util.Map[EngineConnTaskInfo, 
CodeExecTaskExecutor]
 
   def getAllExecTaskToExecutorCache(): java.util.Map[String, 
CodeExecTaskExecutor]
 
diff --git 
a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/execute/DefaultCodeExecTaskExecutorManager.scala
 
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/execute/DefaultCodeExecTaskExecutorManager.scala
index 03b0c31c6..8f95172fb 100644
--- 
a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/execute/DefaultCodeExecTaskExecutorManager.scala
+++ 
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/execute/DefaultCodeExecTaskExecutorManager.scala
@@ -20,7 +20,7 @@ package org.apache.linkis.orchestrator.computation.execute
 import org.apache.linkis.common.ServiceInstance
 import org.apache.linkis.common.exception.LinkisRetryException
 import org.apache.linkis.common.log.LogUtils
-import org.apache.linkis.common.utils.{ByteTimeUtils, Logging, Utils}
+import org.apache.linkis.common.utils.{Logging, Utils}
 import org.apache.linkis.manager.label.constant.LabelKeyConstant
 import org.apache.linkis.manager.label.entity.Label
 import org.apache.linkis.manager.label.entity.entrance.LoadBalanceLabel
@@ -29,7 +29,11 @@ import 
org.apache.linkis.orchestrator.computation.conf.ComputationOrchestratorCo
 import 
org.apache.linkis.orchestrator.computation.physical.CodeLogicalUnitExecTask
 import org.apache.linkis.orchestrator.ecm.{EngineConnManager, 
EngineConnManagerBuilder}
 import org.apache.linkis.orchestrator.ecm.entity._
-import 
org.apache.linkis.orchestrator.exception.OrchestratorLabelConflictException
+import org.apache.linkis.orchestrator.exception.{
+  OrchestratorErrorCodeSummary,
+  OrchestratorLabelConflictException,
+  OrchestratorRetryException
+}
 import org.apache.linkis.orchestrator.listener.task.TaskLogEvent
 
 import org.apache.commons.lang3.StringUtils
@@ -43,7 +47,7 @@ import scala.concurrent.duration.Duration
 class DefaultCodeExecTaskExecutorManager extends CodeExecTaskExecutorManager 
with Logging {
 
   private val instanceToExecutors =
-    new util.concurrent.ConcurrentHashMap[ServiceInstance, 
Array[CodeExecTaskExecutor]]
+    new util.concurrent.ConcurrentHashMap[EngineConnTaskInfo, 
CodeExecTaskExecutor]
 
   private val execTaskToExecutor =
     new util.concurrent.ConcurrentHashMap[String, CodeExecTaskExecutor]()
@@ -79,10 +83,8 @@ class DefaultCodeExecTaskExecutorManager extends 
CodeExecTaskExecutorManager wit
         case t: Throwable => throw t
       } match {
         case Some(e) =>
-          logger.info(
-            s"Finished to askExecutor for execId ${execTask
-              .getIDInfo()}, wait 
${ByteTimeUtils.msDurationToString(System.currentTimeMillis - startTime)}"
-          )
+          logger.info(s"Finished to askExecutor for execId ${execTask
+            .getIDInfo()}, wait ${System.currentTimeMillis() - startTime}")
           executor = Option(e)
         case _ =>
           if (System.currentTimeMillis - startTime < wait.toMillis) {
@@ -184,14 +186,12 @@ class DefaultCodeExecTaskExecutorManager extends 
CodeExecTaskExecutorManager wit
       serviceInstance: ServiceInstance,
       engineConnTaskId: String
   ): Option[CodeExecTaskExecutor] = {
-    val maybeExecutors = instanceToExecutors.get(serviceInstance)
-    if (null != maybeExecutors) {
-      val executors = maybeExecutors.filter(_.getEngineConnTaskId == 
engineConnTaskId)
-      if (null != executors && executors.nonEmpty) {
-        return Some(executors(0))
-      }
-    }
-    None
+
+    val maybeExecutors =
+      instanceToExecutors.get(EngineConnTaskInfo(serviceInstance, 
engineConnTaskId))
+    if (null != maybeExecutors && maybeExecutors.getEngineConnTaskId == 
engineConnTaskId) {
+      Some(maybeExecutors)
+    } else None
   }
 
   override def getByExecTaskId(execTaskId: String): 
Option[CodeExecTaskExecutor] = {
@@ -231,35 +231,30 @@ class DefaultCodeExecTaskExecutorManager extends 
CodeExecTaskExecutorManager wit
 
   private def removeExecutorFromInstanceToExecutors(executor: 
CodeExecTaskExecutor): Unit = {
     logger.debug(s"To delete codeExecTaskExecutor  ${executor} from 
instanceToExecutors")
-    val maybeExecutors =
-      
instanceToExecutors.get(executor.getEngineConnExecutor.getServiceInstance)
-    if (null != maybeExecutors) {
-      val executors =
-        maybeExecutors.filter(_.getEngineConnTaskId != 
executor.getEngineConnTaskId)
-      if (null != executors && executors.nonEmpty) {
-        
instanceToExecutors.put(executor.getEngineConnExecutor.getServiceInstance, 
executors)
-      } else {
-        
instanceToExecutors.remove(executor.getEngineConnExecutor.getServiceInstance)
-      }
-    }
+    val engineConnTaskInfo = EngineConnTaskInfo(
+      executor.getEngineConnExecutor.getServiceInstance,
+      executor.getEngineConnTaskId
+    )
+    instanceToExecutors.remove(engineConnTaskInfo)
+    execTaskToExecutor.remove(executor.getExecTaskId)
     logger.info(
       s"To delete exec task ${executor.getExecTask.getIDInfo()} and 
CodeExecTaskExecutor ${executor.getEngineConnExecutor.getServiceInstance} 
relation"
     )
-    execTaskToExecutor.remove(executor.getExecTaskId)
   }
 
-  override def addEngineConnTaskID(executor: CodeExecTaskExecutor): Unit = {
-    if (null == executor) return
-    execTaskToExecutor.put(executor.getExecTaskId, executor)
-    logger.info(s"To add codeExecTaskExecutor  $executor to 
instanceToExecutors")
-    val executors = instanceToExecutors.getOrDefault(
-      executor.getEngineConnExecutor.getServiceInstance,
-      Array.empty[CodeExecTaskExecutor]
-    )
-    instanceToExecutors.put(
+  override def addEngineConnTaskInfo(executor: CodeExecTaskExecutor): Unit = {
+    if (null == executor || StringUtils.isBlank(executor.getExecTaskId)) {
+      throw new OrchestratorRetryException(
+        OrchestratorErrorCodeSummary.EXECUTION_ERROR_CODE,
+        "Failed to store task information"
+      )
+    }
+    val engineConnTaskInfo = EngineConnTaskInfo(
       executor.getEngineConnExecutor.getServiceInstance,
-      executors.+:(executor)
+      executor.getEngineConnTaskId
     )
+    instanceToExecutors.put(engineConnTaskInfo, executor)
+    logger.info(s"Finished To add codeExecTaskExecutor  $executor to 
instanceToExecutors")
   }
 
   private def getEngineConnManager(labels: util.List[Label[_]]): 
EngineConnManager = {
@@ -270,8 +265,8 @@ class DefaultCodeExecTaskExecutorManager extends 
CodeExecTaskExecutorManager wit
     defaultEngineConnManager
   }
 
-  override def getAllInstanceToExecutorCache()
-      : util.Map[ServiceInstance, Array[CodeExecTaskExecutor]] = 
instanceToExecutors
+  override def getAllInstanceToExecutorCache(): util.Map[EngineConnTaskInfo, 
CodeExecTaskExecutor] =
+    instanceToExecutors
 
   override def getAllExecTaskToExecutorCache(): util.Map[String, 
CodeExecTaskExecutor] =
     execTaskToExecutor
diff --git 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/executor-core/src/main/scala/org/apache/linkis/engineconn/executor/entity/Executor.scala
 
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/execute/EngineConnTaskInfo.scala
similarity index 58%
copy from 
linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/executor-core/src/main/scala/org/apache/linkis/engineconn/executor/entity/Executor.scala
copy to 
linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/execute/EngineConnTaskInfo.scala
index f448713f9..e991ce38e 100644
--- 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/executor-core/src/main/scala/org/apache/linkis/engineconn/executor/entity/Executor.scala
+++ 
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/execute/EngineConnTaskInfo.scala
@@ -15,39 +15,8 @@
  * limitations under the License.
  */
 
-package org.apache.linkis.engineconn.executor.entity
+package org.apache.linkis.orchestrator.computation.execute
 
-import org.apache.linkis.common.utils.Logging
+import org.apache.linkis.common.ServiceInstance
 
-trait Executor extends Logging {
-
-  def getId: String
-
-  def init(): Unit
-
-  def tryReady(): Boolean
-
-  def tryShutdown(): Boolean
-
-  def tryFailed(): Boolean
-
-  def trySucceed(): Boolean
-
-  /**
-   * 仅用于Kill Executor EngineConn kill 在AccessibleService
-   */
-  def close(): Unit = {
-    logger.warn(s"Executor($getId) exit by close.")
-  }
-
-  def isClosed: Boolean
-
-}
-
-trait ConcurrentExecutor extends Executor {
-
-  def getConcurrentLimit: Int
-
-  def killAll(): Unit
-
-}
+case class EngineConnTaskInfo(ecServiceInstance: ServiceInstance, ecTaskId: 
String)
diff --git 
a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/monitor/EngineConnMonitor.scala
 
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/monitor/EngineConnMonitor.scala
index 14bf62d47..5afb75b4a 100644
--- 
a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/monitor/EngineConnMonitor.scala
+++ 
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/monitor/EngineConnMonitor.scala
@@ -18,9 +18,10 @@
 package org.apache.linkis.orchestrator.computation.monitor
 
 import org.apache.linkis.common.ServiceInstance
+import org.apache.linkis.common.log.LogUtils
 import org.apache.linkis.common.utils.{Logging, Utils}
 import org.apache.linkis.governance.common.conf.GovernanceCommonConf
-import org.apache.linkis.governance.common.entity.NodeExistStatus
+import org.apache.linkis.governance.common.entity.{ExecutionNodeStatus, 
NodeExistStatus}
 import org.apache.linkis.governance.common.protocol.engineconn.{
   RequestEngineStatusBatch,
   ResponseEngineStatusBatch
@@ -29,15 +30,19 @@ import 
org.apache.linkis.governance.common.utils.GovernanceConstant
 import org.apache.linkis.manager.common.entity.enumeration.NodeStatus
 import org.apache.linkis.manager.common.protocol.node.{RequestNodeStatus, 
ResponseNodeStatus}
 import 
org.apache.linkis.orchestrator.computation.conf.ComputationOrchestratorConf
-import org.apache.linkis.orchestrator.computation.execute.CodeExecTaskExecutor
-import org.apache.linkis.orchestrator.ecm.service.EngineConnExecutor
+import 
org.apache.linkis.orchestrator.computation.execute.{CodeExecTaskExecutor, 
EngineConnTaskInfo}
+import org.apache.linkis.orchestrator.listener.task.{
+  TaskErrorResponseEvent,
+  TaskLogEvent,
+  TaskStatusEvent
+}
 import org.apache.linkis.rpc.Sender
 import org.apache.linkis.server.{toJavaMap, BDPJettyServerHelper}
 
 import java.util
 import java.util.concurrent.TimeUnit
 
-import scala.collection.JavaConverters.{asScalaBufferConverter, 
mapAsScalaMapConverter}
+import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 
@@ -47,43 +52,46 @@ object EngineConnMonitor extends Logging {
     ComputationOrchestratorConf.ENGINECONN_LASTUPDATE_TIMEOUT.getValue.toLong
 
   private[linkis] def addEngineExecutorStatusMonitor(
-      engineConnExecutorCache: util.Map[ServiceInstance, 
Array[CodeExecTaskExecutor]],
-      endJobByEngineInstance: ServiceInstance => Unit
+      engineConnExecutorCache: util.Map[EngineConnTaskInfo, 
CodeExecTaskExecutor]
   ): Unit = {
     val task = new Runnable {
       override def run(): Unit = Utils.tryAndWarn {
         val startTime = System.currentTimeMillis()
-        val engineExecutorCache = engineConnExecutorCache
-        val unActivityEngines = new mutable.HashSet[EngineConnExecutor]()
-        val allTaskExecutors = engineExecutorCache.values().iterator()
-        while (allTaskExecutors.hasNext) {
-          val taskExecutors = allTaskExecutors.next()
-          taskExecutors.foreach(executor => {
-            val interval = startTime - 
executor.getEngineConnExecutor.getLastUpdateTime()
-            if (interval > ENGINECONN_LASTUPDATE_TIMEOUT) {
-              unActivityEngines.add(executor.getEngineConnExecutor)
-            }
-          })
-        }
-        if (null != unActivityEngines && unActivityEngines.nonEmpty) {
-          logger.info(s"There are ${unActivityEngines.size} unActivity 
engines.")
-          val engineList = new util.ArrayList[ServiceInstance]()
-          val instanceAndExecutors =
-            new mutable.HashMap[ServiceInstance, Array[CodeExecTaskExecutor]]()
-          unActivityEngines.foreach(engine => {
-            engineList.add(engine.getServiceInstance)
-            val executors = 
engineConnExecutorCache.get(engine.getServiceInstance)
-            if (null != executors) {
-              instanceAndExecutors.put(engine.getServiceInstance, executors)
-            }
-            if (engineList.size() >= 
GovernanceConstant.REQUEST_ENGINE_STATUS_BATCH_LIMIT) {
-              queryEngineStatusAndHandle(instanceAndExecutors, engineList, 
endJobByEngineInstance)
+        val unActivityExecutors =
+          new mutable.HashMap[ServiceInstance, 
ArrayBuffer[CodeExecTaskExecutor]]()
+        val allTaskExecutors = new ArrayBuffer[CodeExecTaskExecutor]()
+        allTaskExecutors.appendAll(engineConnExecutorCache.values().asScala)
+        allTaskExecutors
+          .filter(executor =>
+            (startTime - executor.getEngineConnExecutor
+              .getLastUpdateTime()) > ENGINECONN_LASTUPDATE_TIMEOUT
+          )
+          .foreach { executor =>
+            val executors = unActivityExecutors.getOrElseUpdate(
+              executor.getEngineConnExecutor.getServiceInstance,
+              new ArrayBuffer[CodeExecTaskExecutor]()
+            )
+            executors.append(executor)
+          }
+
+        if (unActivityExecutors.nonEmpty) {
+          logger.info("There are {} unActivity engineConn.", 
unActivityExecutors.size)
+
+          if (unActivityExecutors.size > 
GovernanceConstant.REQUEST_ENGINE_STATUS_BATCH_LIMIT) {
+            queryEngineStatusAndHandle(unActivityExecutors, 
unActivityExecutors.keys.toList.asJava)
+          } else {
+            val engineList = new util.ArrayList[ServiceInstance]()
+            unActivityExecutors.keys.foreach(serviceInstance => {
+              engineList.add(serviceInstance)
+              if (engineList.size() >= 
GovernanceConstant.REQUEST_ENGINE_STATUS_BATCH_LIMIT) {
+                queryEngineStatusAndHandle(unActivityExecutors, engineList)
+                engineList.clear()
+              }
+            })
+            if (!engineList.isEmpty) {
+              queryEngineStatusAndHandle(unActivityExecutors, engineList)
               engineList.clear()
             }
-          })
-          if (!engineList.isEmpty) {
-            queryEngineStatusAndHandle(instanceAndExecutors, engineList, 
endJobByEngineInstance)
-            engineList.clear()
           }
         }
         val endTime = System.currentTimeMillis()
@@ -106,9 +114,8 @@ object EngineConnMonitor extends Logging {
   }
 
   private def queryEngineStatusAndHandle(
-      engineConnExecutorCache: mutable.HashMap[ServiceInstance, 
Array[CodeExecTaskExecutor]],
-      engineList: util.List[ServiceInstance],
-      endJobByEngineInstance: ServiceInstance => Unit
+      unActivityExecutors: mutable.HashMap[ServiceInstance, 
ArrayBuffer[CodeExecTaskExecutor]],
+      engineList: util.List[ServiceInstance]
   ): Unit = {
     val requestEngineStatus = RequestEngineStatusBatch(engineList)
     Utils.tryAndError {
@@ -117,11 +124,14 @@ object EngineConnMonitor extends Logging {
         .ask(requestEngineStatus) match {
         case response: ResponseEngineStatusBatch =>
           if (null != response.msg) {
-            logger.info(s"ResponseEngineStatusBatch msg : ${response.msg}")
+            logger.info("ResponseEngineStatusBatch msg: {}", response.msg)
           }
           if (response.engineStatus.size() != 
requestEngineStatus.engineList.size()) {
-            logger.warn(s"ResponseEngineStatusBatch engines size : 
${response.engineStatus
-              .size()} is not euqal requet : 
${requestEngineStatus.engineList.size()}.")
+            logger.warn(
+              "ResponseEngineStatusBatch engines size: {} is not equal 
request: {}.",
+              response.engineStatus.size(): Any,
+              requestEngineStatus.engineList.size(): Any
+            )
             val unKnownEngines = new ArrayBuffer[ServiceInstance]()
             requestEngineStatus.engineList.asScala.foreach(instance => {
               if (!response.engineStatus.containsKey(instance)) {
@@ -130,14 +140,15 @@ object EngineConnMonitor extends Logging {
               }
             })
             val instances = unKnownEngines.map(_.getInstance).mkString(",")
-            logger.warn(s"These engine instances cannot be found in manager : 
${instances}")
+            logger.warn("These engine instances cannot be found in manager : 
{}", instances)
           }
           response.engineStatus.asScala.foreach(status =>
-            dealWithEngineStatus(status, engineConnExecutorCache, 
endJobByEngineInstance)
+            dealWithEngineStatus(status, unActivityExecutors)
           )
         case _ =>
           logger.warn(
-            s"Invalid response. request : 
${BDPJettyServerHelper.gson.toJson(requestEngineStatus)}"
+            "Invalid response. request : {}",
+            BDPJettyServerHelper.gson.toJson(requestEngineStatus)
           )
       }
     }
@@ -145,28 +156,26 @@ object EngineConnMonitor extends Logging {
 
   private def dealWithEngineStatus(
       status: (ServiceInstance, NodeExistStatus),
-      engineConnExecutorCache: mutable.HashMap[ServiceInstance, 
Array[CodeExecTaskExecutor]],
-      endJobByEngineInstance: ServiceInstance => Unit
+      unActivityExecutors: mutable.HashMap[ServiceInstance, 
ArrayBuffer[CodeExecTaskExecutor]]
   ): Unit = {
     status._2 match {
       case NodeExistStatus.UnExist =>
-        logger.warn(s"Engine ${status._1} is Failed, now go to clear its 
task.")
-        endJobByEngineInstance(status._1)
+        logger.warn("Engine {} is Failed, now go to clear its task.", 
status._1)
+        killTask(unActivityExecutors.get(status._1))
       case NodeExistStatus.Exist | NodeExistStatus.Unknown =>
-        val engineConnExecutor = 
engineConnExecutorCache.getOrDefault(status._1, null)
+        val engineConnExecutor = unActivityExecutors.getOrDefault(status._1, 
null)
         if (null != engineConnExecutor) {
           Utils.tryCatch {
-            // todo check - only for engine with accessible executor
             val requestNodeStatus = new RequestNodeStatus
             Sender.getSender(status._1).ask(requestNodeStatus) match {
               case rs: ResponseNodeStatus =>
                 if (NodeStatus.isCompleted(rs.getNodeStatus)) {
-                  endJobByEngineInstance(status._1)
+                  killTask(unActivityExecutors.get(status._1))
                 } else {
                   if (logger.isDebugEnabled()) {
-                    logger.debug(s"Will update 
engineConnExecutor(${status._1}) lastupdated time")
+                    logger.debug("Will update engineConnExecutor({}) 
lastupdated time", status._1)
                   }
-                  updateExecutorActivityTime(status._1, 
engineConnExecutorCache)
+                  updateExecutorActivityTime(status._1, unActivityExecutors)
                 }
               case _ =>
                 logger.warn(
@@ -175,27 +184,58 @@ object EngineConnMonitor extends Logging {
             }
           } { t: Throwable =>
             logger.error(s"Failed to get status of engineConn : ${status._1}, 
now end the job. ", t)
-            endJobByEngineInstance(status._1)
+            killTask(unActivityExecutors.get(status._1))
           }
         }
     }
   }
 
+  private def killTask(mayExecutors: 
Option[ArrayBuffer[CodeExecTaskExecutor]]): Unit = {
+    if (mayExecutors.isEmpty) {
+      logger.error("executor is not Defined")
+      return
+    }
+    val executors = mayExecutors.get
+    executors.foreach { executor =>
+      val execTask = executor.getExecTask
+      Utils.tryAndError {
+        logger.warn(
+          s"Will kill task ${execTask.getIDInfo()} because the engine 
${executor.getEngineConnExecutor.getServiceInstance.toString} quited 
unexpectedly."
+        )
+        val errLog = LogUtils.generateERROR(
+          s"Your job : ${execTask.getIDInfo()} was failed because the engine 
quitted unexpectedly(任务${execTask
+            .getIDInfo()}失败," +
+            s"原因是引擎意外退出,可能是复杂任务导致引擎退出,如OOM)."
+        )
+        val logEvent = TaskLogEvent(execTask, errLog)
+        execTask.getPhysicalContext.pushLog(logEvent)
+        val errorResponseEvent = TaskErrorResponseEvent(
+          execTask,
+          "task failed,Engine quitted 
unexpectedly(任务运行失败原因是引擎意外退出,可能是复杂任务导致引擎退出,如OOM)."
+        )
+        execTask.getPhysicalContext.broadcastSyncEvent(errorResponseEvent)
+        val statusEvent = TaskStatusEvent(execTask, ExecutionNodeStatus.Failed)
+        execTask.getPhysicalContext.broadcastSyncEvent(statusEvent)
+      }
+    }
+  }
+
   private def updateExecutorActivityTime(
       serviceInstance: ServiceInstance,
-      engineConnExecutorCache: mutable.HashMap[ServiceInstance, 
Array[CodeExecTaskExecutor]]
+      engineConnExecutorCache: mutable.HashMap[ServiceInstance, 
ArrayBuffer[CodeExecTaskExecutor]]
   ): Unit = {
     if (null != serviceInstance) {
-      val taskExecutorList = 
engineConnExecutorCache.getOrDefault(serviceInstance, null)
-      if (null != taskExecutorList) {
-        taskExecutorList.foreach(executor =>
+      val executors = engineConnExecutorCache.getOrDefault(serviceInstance, 
null)
+      if (null != executors) {
+        executors.foreach { executor =>
           if 
(executor.getEngineConnExecutor.getServiceInstance.equals(serviceInstance)) {
             executor.getEngineConnExecutor.updateLastUpdateTime()
           }
-        )
+        }
       } else {
         logger.warn(
-          s"EngineConnExecutor ${serviceInstance.toString} cannot be found in 
engineConnExecutorCache"
+          "EngineConnExecutor {} cannot be found in engineConnExecutorCache",
+          serviceInstance.toString
         )
       }
     }
diff --git 
a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/physical/CodeLogicalUnitExecTask.scala
 
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/physical/CodeLogicalUnitExecTask.scala
index e7f98c5c6..86c1cad86 100644
--- 
a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/physical/CodeLogicalUnitExecTask.scala
+++ 
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/physical/CodeLogicalUnitExecTask.scala
@@ -114,7 +114,7 @@ class CodeLogicalUnitExecTask(parents: Array[ExecTask], 
children: Array[ExecTask
       response match {
         case SubmitResponse(engineConnExecId) =>
           codeExecutor.setEngineConnTaskId(engineConnExecId)
-          codeExecTaskExecutorManager.addEngineConnTaskID(codeExecutor)
+          codeExecTaskExecutorManager.addEngineConnTaskInfo(codeExecutor)
           val infoMap = new util.HashMap[String, Object]
           infoMap.put(
             TaskConstant.ENGINE_INSTANCE,
diff --git 
a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/service/ComputationTaskExecutionReceiver.scala
 
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/service/ComputationTaskExecutionReceiver.scala
index f1e17703d..521205927 100644
--- 
a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/service/ComputationTaskExecutionReceiver.scala
+++ 
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/service/ComputationTaskExecutionReceiver.scala
@@ -45,38 +45,7 @@ class ComputationTaskExecutionReceiver extends 
TaskExecutionReceiver with Loggin
   @PostConstruct
   private def init(): Unit = {
     EngineConnMonitor.addEngineExecutorStatusMonitor(
-      codeExecTaskExecutorManager.getAllInstanceToExecutorCache(),
-      failedEngineServiceInstance => {
-        val taskToExecutorCache = 
codeExecTaskExecutorManager.getAllExecTaskToExecutorCache()
-        val allExecutor = taskToExecutorCache.values().iterator()
-        while (allExecutor.hasNext) {
-          val executor = allExecutor.next()
-          if (
-              
executor.getEngineConnExecutor.getServiceInstance.equals(failedEngineServiceInstance)
-          ) {
-            val execTask = executor.getExecTask
-            Utils.tryAndError {
-              logger.warn(
-                s"Will kill task ${execTask.getIDInfo()} because the engine 
${executor.getEngineConnExecutor.getServiceInstance.toString} quited 
unexpectedly."
-              )
-              val errLog = LogUtils.generateERROR(
-                s"Your job : ${execTask.getIDInfo()} was failed because the 
engine quitted unexpectedly(任务${execTask
-                  .getIDInfo()}失败," +
-                  s"原因是引擎意外退出,可能是复杂任务导致引擎退出,如OOM)."
-              )
-              val logEvent = TaskLogEvent(execTask, errLog)
-              execTask.getPhysicalContext.pushLog(logEvent)
-              val errorResponseEvent = TaskErrorResponseEvent(
-                execTask,
-                "task failed,Engine quitted 
unexpectedly(任务运行失败原因是引擎意外退出,可能是复杂任务导致引擎退出,如OOM)."
-              )
-              
execTask.getPhysicalContext.broadcastSyncEvent(errorResponseEvent)
-              val statusEvent = TaskStatusEvent(execTask, 
ExecutionNodeStatus.Failed)
-              execTask.getPhysicalContext.broadcastSyncEvent(statusEvent)
-            }
-          }
-        }
-      }
+      codeExecTaskExecutorManager.getAllInstanceToExecutorCache()
     )
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to