This is an automated email from the ASF dual-hosted git repository.
casion pushed a commit to branch dev-1.3.2
in repository https://gitbox.apache.org/repos/asf/linkis.git
The following commit(s) were added to refs/heads/dev-1.3.2 by this push:
new e36a2bcb0 Fix DefaultCodeExecTaskExecutorManager.instanceToExecutors
concurrency issues (#4118)
e36a2bcb0 is described below
commit e36a2bcb037cf69dd4efe18de57c53f073a16c8e
Author: peacewong <[email protected]>
AuthorDate: Tue Jan 17 22:30:47 2023 +0800
Fix DefaultCodeExecTaskExecutorManager.instanceToExecutors concurrency
issues (#4118)
* Fix concurrency issues #4078
* optimize concurrentExecutor idle exit
---
.../linkis-computation-orchestrator.md | 1 +
.../concurrent/monitor/TaskMonitorService.java | 3 -
.../async/AsyncConcurrentComputationExecutor.scala | 4 +
.../execute/ConcurrentComputationExecutor.scala | 35 +++++
.../execution/AccessibleEngineConnExecution.scala | 22 ++-
.../engineconn/executor/entity/Executor.scala | 2 +
.../parser/DefaultCodeJobParserTransform.scala | 4 +-
.../conf/ComputationOrchestratorConf.scala | 5 +-
.../execute/CodeExecTaskExecutorManager.scala | 4 +-
.../DefaultCodeExecTaskExecutorManager.scala | 73 +++++-----
.../computation/execute/EngineConnTaskInfo.scala | 37 +----
.../computation/monitor/EngineConnMonitor.scala | 160 +++++++++++++--------
.../physical/CodeLogicalUnitExecTask.scala | 2 +-
.../service/ComputationTaskExecutionReceiver.scala | 33 +----
14 files changed, 205 insertions(+), 180 deletions(-)
diff --git a/docs/configuration/linkis-computation-orchestrator.md
b/docs/configuration/linkis-computation-orchestrator.md
index aaa6893a2..1b42bc227 100644
--- a/docs/configuration/linkis-computation-orchestrator.md
+++ b/docs/configuration/linkis-computation-orchestrator.md
@@ -17,5 +17,6 @@
|linkis-computation-orchestrator|wds.linkis.orchestrator.engine.lastupdate.timeout|5s|
orchestrator.engine.lastupdate.timeout |
|linkis-computation-orchestrator|wds.linkis.orchestrator.engine.timeout| 10s|
orchestrator.engine.timeout|
|linkis-computation-orchestrator|wds.linkis.orchestrator.engine.activity_monitor.interval|10s|
orchestrator.engine.activity_monitor.interval||
+|linkis-computation-orchestrator|wds.linkis.shell.white.usage|cd,ls|
orchestrator.engine.activity_monitor.interval|The shell whitelist configuration
has expired|
diff --git
a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/java/org/apache/linkis/engineconn/computation/concurrent/monitor/TaskMonitorService.java
b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/java/org/apache/linkis/engineconn/computation/concurrent/monitor/TaskMonitorService.java
index 591a9792f..3f081314f 100644
---
a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/java/org/apache/linkis/engineconn/computation/concurrent/monitor/TaskMonitorService.java
+++
b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/java/org/apache/linkis/engineconn/computation/concurrent/monitor/TaskMonitorService.java
@@ -22,12 +22,9 @@ import org.apache.linkis.engineconn.core.EngineConnObject;
import org.apache.linkis.engineconn.core.executor.ExecutorManager$;
import org.apache.linkis.engineconn.executor.entity.Executor;
-import org.springframework.stereotype.Component;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-@Component
public class TaskMonitorService implements MonitorService {
private static Logger LOG =
LoggerFactory.getLogger(HardwareMonitorService.class);
diff --git
a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/async/AsyncConcurrentComputationExecutor.scala
b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/async/AsyncConcurrentComputationExecutor.scala
index 870b9ea65..cf8b9c00b 100644
---
a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/async/AsyncConcurrentComputationExecutor.scala
+++
b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/async/AsyncConcurrentComputationExecutor.scala
@@ -255,4 +255,8 @@ abstract class AsyncConcurrentComputationExecutor(override
val outputPrintLimit:
}
}
+ override def hasTaskRunning(): Boolean = {
+ getRunningTask > 0
+ }
+
}
diff --git
a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ConcurrentComputationExecutor.scala
b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ConcurrentComputationExecutor.scala
index 0699ae779..a9447109d 100644
---
a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ConcurrentComputationExecutor.scala
+++
b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ConcurrentComputationExecutor.scala
@@ -26,6 +26,37 @@ abstract class ConcurrentComputationExecutor(override val
outputPrintLimit: Int
extends ComputationExecutor(outputPrintLimit)
with ConcurrentExecutor {
+ private val EXECUTOR_STATUS_LOCKER = new Object
+
+ override def execute(engineConnTask: EngineConnTask): ExecuteResponse = {
+ if (isBusy) {
+ logger.error(
+ s"Executor is busy but still got new task ! Running task num :
${getRunningTask}"
+ )
+ }
+ if (getRunningTask >= getConcurrentLimit)
EXECUTOR_STATUS_LOCKER.synchronized {
+ if (getRunningTask >= getConcurrentLimit &&
NodeStatus.isIdle(getStatus)) {
+ logger.info(
+ s"running task: $getRunningTask > concurrent limit:
$getConcurrentLimit, now to mark engine to busy"
+ )
+ transition(NodeStatus.Busy)
+ }
+ }
+ logger.info(s"engineConnTask(${engineConnTask.getTaskId}) running task is
($getRunningTask) ")
+ val response = super.execute(engineConnTask)
+ if (getStatus == NodeStatus.Busy && getConcurrentLimit > getRunningTask) {
+ EXECUTOR_STATUS_LOCKER.synchronized {
+ if (getStatus == NodeStatus.Busy && getConcurrentLimit >
getRunningTask) {
+ logger.info(
+ s"running task($getRunningTask) < concurrent
limit:$getConcurrentLimit, now to mark engine to Unlock "
+ )
+ transition(NodeStatus.Unlock)
+ }
+ }
+ }
+ response
+ }
+
protected override def ensureOp[A](f: => A): A = f
override def afterExecute(
@@ -33,4 +64,8 @@ abstract class ConcurrentComputationExecutor(override val
outputPrintLimit: Int
executeResponse: ExecuteResponse
): Unit = {}
+ override def hasTaskRunning(): Boolean = {
+ getRunningTask > 0
+ }
+
}
diff --git
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/execution/AccessibleEngineConnExecution.scala
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/execution/AccessibleEngineConnExecution.scala
index 421968ecc..1b5713e56 100644
---
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/execution/AccessibleEngineConnExecution.scala
+++
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/execution/AccessibleEngineConnExecution.scala
@@ -26,7 +26,12 @@ import
org.apache.linkis.engineconn.common.execution.EngineConnExecution
import org.apache.linkis.engineconn.core.EngineConnObject
import org.apache.linkis.engineconn.core.executor.ExecutorManager
import org.apache.linkis.engineconn.core.hook.ShutdownHook
-import org.apache.linkis.engineconn.executor.entity.{Executor, LabelExecutor,
ResourceExecutor}
+import org.apache.linkis.engineconn.executor.entity.{
+ ConcurrentExecutor,
+ Executor,
+ LabelExecutor,
+ ResourceExecutor
+}
import
org.apache.linkis.engineconn.executor.listener.ExecutorListenerBusContext
import org.apache.linkis.engineconn.executor.service.ManagerService
import org.apache.linkis.manager.common.entity.enumeration.NodeStatus
@@ -111,7 +116,10 @@ class AccessibleEngineConnExecution extends
EngineConnExecution with Logging {
) || NodeStatus.Idle.equals(accessibleExecutor.getStatus))
&& System.currentTimeMillis -
accessibleExecutor.getLastActivityTime > maxFreeTime
) {
- if (ifECCanMaintain()) {
+ if (isConcurrentExecutorHasTaskRunning(accessibleExecutor)) {
+ logger.info("ConcurrentExecutor has task running ec will not be
killed at this time")
+ accessibleExecutor.updateLastActivityTime()
+ } else if (isECCanMaintain()) {
logger.info("ec will not be killed at this time")
accessibleExecutor.updateLastActivityTime()
} else {
@@ -159,7 +167,15 @@ class AccessibleEngineConnExecution extends
EngineConnExecution with Logging {
}
}
- private def ifECCanMaintain(): Boolean = {
+ private def isConcurrentExecutorHasTaskRunning(executor: Executor): Boolean
= {
+ executor match {
+ case concurrentExecutor: ConcurrentExecutor =>
+ concurrentExecutor.hasTaskRunning()
+ case _ => false
+ }
+ }
+
+ private def isECCanMaintain(): Boolean = {
if (!isMaintainSupported()) return false
val engineTypeLabel =
LabelUtil.getEngineTypeLabel(EngineConnObject.getEngineCreationContext.getLabels())
diff --git
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/executor-core/src/main/scala/org/apache/linkis/engineconn/executor/entity/Executor.scala
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/executor-core/src/main/scala/org/apache/linkis/engineconn/executor/entity/Executor.scala
index f448713f9..85189d400 100644
---
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/executor-core/src/main/scala/org/apache/linkis/engineconn/executor/entity/Executor.scala
+++
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/executor-core/src/main/scala/org/apache/linkis/engineconn/executor/entity/Executor.scala
@@ -50,4 +50,6 @@ trait ConcurrentExecutor extends Executor {
def killAll(): Unit
+ def hasTaskRunning(): Boolean
+
}
diff --git
a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/catalyst/parser/DefaultCodeJobParserTransform.scala
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/catalyst/parser/DefaultCodeJobParserTransform.scala
index 7ffdcbfdb..535e09fb2 100644
---
a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/catalyst/parser/DefaultCodeJobParserTransform.scala
+++
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/catalyst/parser/DefaultCodeJobParserTransform.scala
@@ -24,7 +24,7 @@ import org.apache.linkis.orchestrator.parser.Parser
import org.apache.linkis.orchestrator.plans.ast.{ASTContext, Job}
import org.apache.linkis.orchestrator.plans.unit.CodeLogicalUnit
-import scala.collection.JavaConverters._
+import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
import com.google.common.collect.Lists
@@ -63,7 +63,7 @@ class DefaultCodeJobParserTransform extends ParserTransform
with Logging {
def splitCode(codeJob: CodeJob): Array[CodeLogicalUnit] = {
val codeLogicalUnits = new ArrayBuffer[CodeLogicalUnit]
- codeJob.getCodeLogicalUnit.getCodes.asScala.foreach { code =>
+ codeJob.getCodeLogicalUnit.getCodes.foreach { code =>
code.split(codeJob.getCodeLogicalUnit.getSeparator).foreach { line =>
codeLogicalUnits.append(
new CodeLogicalUnit(
diff --git
a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/conf/ComputationOrchestratorConf.scala
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/conf/ComputationOrchestratorConf.scala
index e813a2190..6cc1c7e48 100644
---
a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/conf/ComputationOrchestratorConf.scala
+++
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/conf/ComputationOrchestratorConf.scala
@@ -40,10 +40,7 @@ object ComputationOrchestratorConf {
"rm,sh,find,kill,python,for,source,hdfs,hadoop,spark-sql,spark-submit,pyspark,spark-shell,hive,yarn"
)
- val SHELL_WHITE_USAGE = CommonVars(
- "wds.linkis.shell.white.usage",
-
"sqoop,cd,ll,ls,echo,cat,tree,diff,who,grep,whoami,set,pwd,cut,file,head,less,if,while"
- )
+ val SHELL_WHITE_USAGE = CommonVars("wds.linkis.shell.white.usage", "cd,ls")
val SHELL_WHITE_USAGE_ENABLED =
CommonVars("wds.linkis.shell.white.usage.enabled", false)
diff --git
a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/execute/CodeExecTaskExecutorManager.scala
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/execute/CodeExecTaskExecutorManager.scala
index 9bfab20de..428db5f28 100644
---
a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/execute/CodeExecTaskExecutorManager.scala
+++
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/execute/CodeExecTaskExecutorManager.scala
@@ -37,7 +37,7 @@ trait CodeExecTaskExecutorManager {
def askExecutor(execTask: CodeLogicalUnitExecTask, wait: Duration):
Option[CodeExecTaskExecutor]
- def addEngineConnTaskID(executor: CodeExecTaskExecutor): Unit
+ def addEngineConnTaskInfo(executor: CodeExecTaskExecutor): Unit
def getByEngineConnAndTaskId(
serviceInstance: ServiceInstance,
@@ -93,7 +93,7 @@ trait CodeExecTaskExecutorManager {
isSucceed: Boolean
): Unit
- def getAllInstanceToExecutorCache(): java.util.Map[ServiceInstance,
Array[CodeExecTaskExecutor]]
+ def getAllInstanceToExecutorCache(): java.util.Map[EngineConnTaskInfo,
CodeExecTaskExecutor]
def getAllExecTaskToExecutorCache(): java.util.Map[String,
CodeExecTaskExecutor]
diff --git
a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/execute/DefaultCodeExecTaskExecutorManager.scala
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/execute/DefaultCodeExecTaskExecutorManager.scala
index 03b0c31c6..8f95172fb 100644
---
a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/execute/DefaultCodeExecTaskExecutorManager.scala
+++
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/execute/DefaultCodeExecTaskExecutorManager.scala
@@ -20,7 +20,7 @@ package org.apache.linkis.orchestrator.computation.execute
import org.apache.linkis.common.ServiceInstance
import org.apache.linkis.common.exception.LinkisRetryException
import org.apache.linkis.common.log.LogUtils
-import org.apache.linkis.common.utils.{ByteTimeUtils, Logging, Utils}
+import org.apache.linkis.common.utils.{Logging, Utils}
import org.apache.linkis.manager.label.constant.LabelKeyConstant
import org.apache.linkis.manager.label.entity.Label
import org.apache.linkis.manager.label.entity.entrance.LoadBalanceLabel
@@ -29,7 +29,11 @@ import
org.apache.linkis.orchestrator.computation.conf.ComputationOrchestratorCo
import
org.apache.linkis.orchestrator.computation.physical.CodeLogicalUnitExecTask
import org.apache.linkis.orchestrator.ecm.{EngineConnManager,
EngineConnManagerBuilder}
import org.apache.linkis.orchestrator.ecm.entity._
-import
org.apache.linkis.orchestrator.exception.OrchestratorLabelConflictException
+import org.apache.linkis.orchestrator.exception.{
+ OrchestratorErrorCodeSummary,
+ OrchestratorLabelConflictException,
+ OrchestratorRetryException
+}
import org.apache.linkis.orchestrator.listener.task.TaskLogEvent
import org.apache.commons.lang3.StringUtils
@@ -43,7 +47,7 @@ import scala.concurrent.duration.Duration
class DefaultCodeExecTaskExecutorManager extends CodeExecTaskExecutorManager
with Logging {
private val instanceToExecutors =
- new util.concurrent.ConcurrentHashMap[ServiceInstance,
Array[CodeExecTaskExecutor]]
+ new util.concurrent.ConcurrentHashMap[EngineConnTaskInfo,
CodeExecTaskExecutor]
private val execTaskToExecutor =
new util.concurrent.ConcurrentHashMap[String, CodeExecTaskExecutor]()
@@ -79,10 +83,8 @@ class DefaultCodeExecTaskExecutorManager extends
CodeExecTaskExecutorManager wit
case t: Throwable => throw t
} match {
case Some(e) =>
- logger.info(
- s"Finished to askExecutor for execId ${execTask
- .getIDInfo()}, wait
${ByteTimeUtils.msDurationToString(System.currentTimeMillis - startTime)}"
- )
+ logger.info(s"Finished to askExecutor for execId ${execTask
+ .getIDInfo()}, wait ${System.currentTimeMillis() - startTime}")
executor = Option(e)
case _ =>
if (System.currentTimeMillis - startTime < wait.toMillis) {
@@ -184,14 +186,12 @@ class DefaultCodeExecTaskExecutorManager extends
CodeExecTaskExecutorManager wit
serviceInstance: ServiceInstance,
engineConnTaskId: String
): Option[CodeExecTaskExecutor] = {
- val maybeExecutors = instanceToExecutors.get(serviceInstance)
- if (null != maybeExecutors) {
- val executors = maybeExecutors.filter(_.getEngineConnTaskId ==
engineConnTaskId)
- if (null != executors && executors.nonEmpty) {
- return Some(executors(0))
- }
- }
- None
+
+ val maybeExecutors =
+ instanceToExecutors.get(EngineConnTaskInfo(serviceInstance,
engineConnTaskId))
+ if (null != maybeExecutors && maybeExecutors.getEngineConnTaskId ==
engineConnTaskId) {
+ Some(maybeExecutors)
+ } else None
}
override def getByExecTaskId(execTaskId: String):
Option[CodeExecTaskExecutor] = {
@@ -231,35 +231,30 @@ class DefaultCodeExecTaskExecutorManager extends
CodeExecTaskExecutorManager wit
private def removeExecutorFromInstanceToExecutors(executor:
CodeExecTaskExecutor): Unit = {
logger.debug(s"To delete codeExecTaskExecutor ${executor} from
instanceToExecutors")
- val maybeExecutors =
-
instanceToExecutors.get(executor.getEngineConnExecutor.getServiceInstance)
- if (null != maybeExecutors) {
- val executors =
- maybeExecutors.filter(_.getEngineConnTaskId !=
executor.getEngineConnTaskId)
- if (null != executors && executors.nonEmpty) {
-
instanceToExecutors.put(executor.getEngineConnExecutor.getServiceInstance,
executors)
- } else {
-
instanceToExecutors.remove(executor.getEngineConnExecutor.getServiceInstance)
- }
- }
+ val engineConnTaskInfo = EngineConnTaskInfo(
+ executor.getEngineConnExecutor.getServiceInstance,
+ executor.getEngineConnTaskId
+ )
+ instanceToExecutors.remove(engineConnTaskInfo)
+ execTaskToExecutor.remove(executor.getExecTaskId)
logger.info(
s"To delete exec task ${executor.getExecTask.getIDInfo()} and
CodeExecTaskExecutor ${executor.getEngineConnExecutor.getServiceInstance}
relation"
)
- execTaskToExecutor.remove(executor.getExecTaskId)
}
- override def addEngineConnTaskID(executor: CodeExecTaskExecutor): Unit = {
- if (null == executor) return
- execTaskToExecutor.put(executor.getExecTaskId, executor)
- logger.info(s"To add codeExecTaskExecutor $executor to
instanceToExecutors")
- val executors = instanceToExecutors.getOrDefault(
- executor.getEngineConnExecutor.getServiceInstance,
- Array.empty[CodeExecTaskExecutor]
- )
- instanceToExecutors.put(
+ override def addEngineConnTaskInfo(executor: CodeExecTaskExecutor): Unit = {
+ if (null == executor || StringUtils.isBlank(executor.getExecTaskId)) {
+ throw new OrchestratorRetryException(
+ OrchestratorErrorCodeSummary.EXECUTION_ERROR_CODE,
+ "Failed to store task information"
+ )
+ }
+ val engineConnTaskInfo = EngineConnTaskInfo(
executor.getEngineConnExecutor.getServiceInstance,
- executors.+:(executor)
+ executor.getEngineConnTaskId
)
+ instanceToExecutors.put(engineConnTaskInfo, executor)
+ logger.info(s"Finished To add codeExecTaskExecutor $executor to
instanceToExecutors")
}
private def getEngineConnManager(labels: util.List[Label[_]]):
EngineConnManager = {
@@ -270,8 +265,8 @@ class DefaultCodeExecTaskExecutorManager extends
CodeExecTaskExecutorManager wit
defaultEngineConnManager
}
- override def getAllInstanceToExecutorCache()
- : util.Map[ServiceInstance, Array[CodeExecTaskExecutor]] =
instanceToExecutors
+ override def getAllInstanceToExecutorCache(): util.Map[EngineConnTaskInfo,
CodeExecTaskExecutor] =
+ instanceToExecutors
override def getAllExecTaskToExecutorCache(): util.Map[String,
CodeExecTaskExecutor] =
execTaskToExecutor
diff --git
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/executor-core/src/main/scala/org/apache/linkis/engineconn/executor/entity/Executor.scala
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/execute/EngineConnTaskInfo.scala
similarity index 58%
copy from
linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/executor-core/src/main/scala/org/apache/linkis/engineconn/executor/entity/Executor.scala
copy to
linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/execute/EngineConnTaskInfo.scala
index f448713f9..e991ce38e 100644
---
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/executor-core/src/main/scala/org/apache/linkis/engineconn/executor/entity/Executor.scala
+++
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/execute/EngineConnTaskInfo.scala
@@ -15,39 +15,8 @@
* limitations under the License.
*/
-package org.apache.linkis.engineconn.executor.entity
+package org.apache.linkis.orchestrator.computation.execute
-import org.apache.linkis.common.utils.Logging
+import org.apache.linkis.common.ServiceInstance
-trait Executor extends Logging {
-
- def getId: String
-
- def init(): Unit
-
- def tryReady(): Boolean
-
- def tryShutdown(): Boolean
-
- def tryFailed(): Boolean
-
- def trySucceed(): Boolean
-
- /**
- * 仅用于Kill Executor EngineConn kill 在AccessibleService
- */
- def close(): Unit = {
- logger.warn(s"Executor($getId) exit by close.")
- }
-
- def isClosed: Boolean
-
-}
-
-trait ConcurrentExecutor extends Executor {
-
- def getConcurrentLimit: Int
-
- def killAll(): Unit
-
-}
+case class EngineConnTaskInfo(ecServiceInstance: ServiceInstance, ecTaskId:
String)
diff --git
a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/monitor/EngineConnMonitor.scala
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/monitor/EngineConnMonitor.scala
index 14bf62d47..5afb75b4a 100644
---
a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/monitor/EngineConnMonitor.scala
+++
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/monitor/EngineConnMonitor.scala
@@ -18,9 +18,10 @@
package org.apache.linkis.orchestrator.computation.monitor
import org.apache.linkis.common.ServiceInstance
+import org.apache.linkis.common.log.LogUtils
import org.apache.linkis.common.utils.{Logging, Utils}
import org.apache.linkis.governance.common.conf.GovernanceCommonConf
-import org.apache.linkis.governance.common.entity.NodeExistStatus
+import org.apache.linkis.governance.common.entity.{ExecutionNodeStatus,
NodeExistStatus}
import org.apache.linkis.governance.common.protocol.engineconn.{
RequestEngineStatusBatch,
ResponseEngineStatusBatch
@@ -29,15 +30,19 @@ import
org.apache.linkis.governance.common.utils.GovernanceConstant
import org.apache.linkis.manager.common.entity.enumeration.NodeStatus
import org.apache.linkis.manager.common.protocol.node.{RequestNodeStatus,
ResponseNodeStatus}
import
org.apache.linkis.orchestrator.computation.conf.ComputationOrchestratorConf
-import org.apache.linkis.orchestrator.computation.execute.CodeExecTaskExecutor
-import org.apache.linkis.orchestrator.ecm.service.EngineConnExecutor
+import
org.apache.linkis.orchestrator.computation.execute.{CodeExecTaskExecutor,
EngineConnTaskInfo}
+import org.apache.linkis.orchestrator.listener.task.{
+ TaskErrorResponseEvent,
+ TaskLogEvent,
+ TaskStatusEvent
+}
import org.apache.linkis.rpc.Sender
import org.apache.linkis.server.{toJavaMap, BDPJettyServerHelper}
import java.util
import java.util.concurrent.TimeUnit
-import scala.collection.JavaConverters.{asScalaBufferConverter,
mapAsScalaMapConverter}
+import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
@@ -47,43 +52,46 @@ object EngineConnMonitor extends Logging {
ComputationOrchestratorConf.ENGINECONN_LASTUPDATE_TIMEOUT.getValue.toLong
private[linkis] def addEngineExecutorStatusMonitor(
- engineConnExecutorCache: util.Map[ServiceInstance,
Array[CodeExecTaskExecutor]],
- endJobByEngineInstance: ServiceInstance => Unit
+ engineConnExecutorCache: util.Map[EngineConnTaskInfo,
CodeExecTaskExecutor]
): Unit = {
val task = new Runnable {
override def run(): Unit = Utils.tryAndWarn {
val startTime = System.currentTimeMillis()
- val engineExecutorCache = engineConnExecutorCache
- val unActivityEngines = new mutable.HashSet[EngineConnExecutor]()
- val allTaskExecutors = engineExecutorCache.values().iterator()
- while (allTaskExecutors.hasNext) {
- val taskExecutors = allTaskExecutors.next()
- taskExecutors.foreach(executor => {
- val interval = startTime -
executor.getEngineConnExecutor.getLastUpdateTime()
- if (interval > ENGINECONN_LASTUPDATE_TIMEOUT) {
- unActivityEngines.add(executor.getEngineConnExecutor)
- }
- })
- }
- if (null != unActivityEngines && unActivityEngines.nonEmpty) {
- logger.info(s"There are ${unActivityEngines.size} unActivity
engines.")
- val engineList = new util.ArrayList[ServiceInstance]()
- val instanceAndExecutors =
- new mutable.HashMap[ServiceInstance, Array[CodeExecTaskExecutor]]()
- unActivityEngines.foreach(engine => {
- engineList.add(engine.getServiceInstance)
- val executors =
engineConnExecutorCache.get(engine.getServiceInstance)
- if (null != executors) {
- instanceAndExecutors.put(engine.getServiceInstance, executors)
- }
- if (engineList.size() >=
GovernanceConstant.REQUEST_ENGINE_STATUS_BATCH_LIMIT) {
- queryEngineStatusAndHandle(instanceAndExecutors, engineList,
endJobByEngineInstance)
+ val unActivityExecutors =
+ new mutable.HashMap[ServiceInstance,
ArrayBuffer[CodeExecTaskExecutor]]()
+ val allTaskExecutors = new ArrayBuffer[CodeExecTaskExecutor]()
+ allTaskExecutors.appendAll(engineConnExecutorCache.values().asScala)
+ allTaskExecutors
+ .filter(executor =>
+ (startTime - executor.getEngineConnExecutor
+ .getLastUpdateTime()) > ENGINECONN_LASTUPDATE_TIMEOUT
+ )
+ .foreach { executor =>
+ val executors = unActivityExecutors.getOrElseUpdate(
+ executor.getEngineConnExecutor.getServiceInstance,
+ new ArrayBuffer[CodeExecTaskExecutor]()
+ )
+ executors.append(executor)
+ }
+
+ if (unActivityExecutors.nonEmpty) {
+ logger.info("There are {} unActivity engineConn.",
unActivityExecutors.size)
+
+ if (unActivityExecutors.size >
GovernanceConstant.REQUEST_ENGINE_STATUS_BATCH_LIMIT) {
+ queryEngineStatusAndHandle(unActivityExecutors,
unActivityExecutors.keys.toList.asJava)
+ } else {
+ val engineList = new util.ArrayList[ServiceInstance]()
+ unActivityExecutors.keys.foreach(serviceInstance => {
+ engineList.add(serviceInstance)
+ if (engineList.size() >=
GovernanceConstant.REQUEST_ENGINE_STATUS_BATCH_LIMIT) {
+ queryEngineStatusAndHandle(unActivityExecutors, engineList)
+ engineList.clear()
+ }
+ })
+ if (!engineList.isEmpty) {
+ queryEngineStatusAndHandle(unActivityExecutors, engineList)
engineList.clear()
}
- })
- if (!engineList.isEmpty) {
- queryEngineStatusAndHandle(instanceAndExecutors, engineList,
endJobByEngineInstance)
- engineList.clear()
}
}
val endTime = System.currentTimeMillis()
@@ -106,9 +114,8 @@ object EngineConnMonitor extends Logging {
}
private def queryEngineStatusAndHandle(
- engineConnExecutorCache: mutable.HashMap[ServiceInstance,
Array[CodeExecTaskExecutor]],
- engineList: util.List[ServiceInstance],
- endJobByEngineInstance: ServiceInstance => Unit
+ unActivityExecutors: mutable.HashMap[ServiceInstance,
ArrayBuffer[CodeExecTaskExecutor]],
+ engineList: util.List[ServiceInstance]
): Unit = {
val requestEngineStatus = RequestEngineStatusBatch(engineList)
Utils.tryAndError {
@@ -117,11 +124,14 @@ object EngineConnMonitor extends Logging {
.ask(requestEngineStatus) match {
case response: ResponseEngineStatusBatch =>
if (null != response.msg) {
- logger.info(s"ResponseEngineStatusBatch msg : ${response.msg}")
+ logger.info("ResponseEngineStatusBatch msg: {}", response.msg)
}
if (response.engineStatus.size() !=
requestEngineStatus.engineList.size()) {
- logger.warn(s"ResponseEngineStatusBatch engines size :
${response.engineStatus
- .size()} is not euqal requet :
${requestEngineStatus.engineList.size()}.")
+ logger.warn(
+ "ResponseEngineStatusBatch engines size: {} is not equal
request: {}.",
+ response.engineStatus.size(): Any,
+ requestEngineStatus.engineList.size(): Any
+ )
val unKnownEngines = new ArrayBuffer[ServiceInstance]()
requestEngineStatus.engineList.asScala.foreach(instance => {
if (!response.engineStatus.containsKey(instance)) {
@@ -130,14 +140,15 @@ object EngineConnMonitor extends Logging {
}
})
val instances = unKnownEngines.map(_.getInstance).mkString(",")
- logger.warn(s"These engine instances cannot be found in manager :
${instances}")
+ logger.warn("These engine instances cannot be found in manager :
{}", instances)
}
response.engineStatus.asScala.foreach(status =>
- dealWithEngineStatus(status, engineConnExecutorCache,
endJobByEngineInstance)
+ dealWithEngineStatus(status, unActivityExecutors)
)
case _ =>
logger.warn(
- s"Invalid response. request :
${BDPJettyServerHelper.gson.toJson(requestEngineStatus)}"
+ "Invalid response. request : {}",
+ BDPJettyServerHelper.gson.toJson(requestEngineStatus)
)
}
}
@@ -145,28 +156,26 @@ object EngineConnMonitor extends Logging {
private def dealWithEngineStatus(
status: (ServiceInstance, NodeExistStatus),
- engineConnExecutorCache: mutable.HashMap[ServiceInstance,
Array[CodeExecTaskExecutor]],
- endJobByEngineInstance: ServiceInstance => Unit
+ unActivityExecutors: mutable.HashMap[ServiceInstance,
ArrayBuffer[CodeExecTaskExecutor]]
): Unit = {
status._2 match {
case NodeExistStatus.UnExist =>
- logger.warn(s"Engine ${status._1} is Failed, now go to clear its
task.")
- endJobByEngineInstance(status._1)
+ logger.warn("Engine {} is Failed, now go to clear its task.",
status._1)
+ killTask(unActivityExecutors.get(status._1))
case NodeExistStatus.Exist | NodeExistStatus.Unknown =>
- val engineConnExecutor =
engineConnExecutorCache.getOrDefault(status._1, null)
+ val engineConnExecutor = unActivityExecutors.getOrDefault(status._1,
null)
if (null != engineConnExecutor) {
Utils.tryCatch {
- // todo check - only for engine with accessible executor
val requestNodeStatus = new RequestNodeStatus
Sender.getSender(status._1).ask(requestNodeStatus) match {
case rs: ResponseNodeStatus =>
if (NodeStatus.isCompleted(rs.getNodeStatus)) {
- endJobByEngineInstance(status._1)
+ killTask(unActivityExecutors.get(status._1))
} else {
if (logger.isDebugEnabled()) {
- logger.debug(s"Will update
engineConnExecutor(${status._1}) lastupdated time")
+ logger.debug("Will update engineConnExecutor({})
lastupdated time", status._1)
}
- updateExecutorActivityTime(status._1,
engineConnExecutorCache)
+ updateExecutorActivityTime(status._1, unActivityExecutors)
}
case _ =>
logger.warn(
@@ -175,27 +184,58 @@ object EngineConnMonitor extends Logging {
}
} { t: Throwable =>
logger.error(s"Failed to get status of engineConn : ${status._1},
now end the job. ", t)
- endJobByEngineInstance(status._1)
+ killTask(unActivityExecutors.get(status._1))
}
}
}
}
+ private def killTask(mayExecutors:
Option[ArrayBuffer[CodeExecTaskExecutor]]): Unit = {
+ if (mayExecutors.isEmpty) {
+ logger.error("executor is not Defined")
+ return
+ }
+ val executors = mayExecutors.get
+ executors.foreach { executor =>
+ val execTask = executor.getExecTask
+ Utils.tryAndError {
+ logger.warn(
+ s"Will kill task ${execTask.getIDInfo()} because the engine
${executor.getEngineConnExecutor.getServiceInstance.toString} quited
unexpectedly."
+ )
+ val errLog = LogUtils.generateERROR(
+ s"Your job : ${execTask.getIDInfo()} was failed because the engine
quitted unexpectedly(任务${execTask
+ .getIDInfo()}失败," +
+ s"原因是引擎意外退出,可能是复杂任务导致引擎退出,如OOM)."
+ )
+ val logEvent = TaskLogEvent(execTask, errLog)
+ execTask.getPhysicalContext.pushLog(logEvent)
+ val errorResponseEvent = TaskErrorResponseEvent(
+ execTask,
+ "task failed,Engine quitted
unexpectedly(任务运行失败原因是引擎意外退出,可能是复杂任务导致引擎退出,如OOM)."
+ )
+ execTask.getPhysicalContext.broadcastSyncEvent(errorResponseEvent)
+ val statusEvent = TaskStatusEvent(execTask, ExecutionNodeStatus.Failed)
+ execTask.getPhysicalContext.broadcastSyncEvent(statusEvent)
+ }
+ }
+ }
+
private def updateExecutorActivityTime(
serviceInstance: ServiceInstance,
- engineConnExecutorCache: mutable.HashMap[ServiceInstance,
Array[CodeExecTaskExecutor]]
+ engineConnExecutorCache: mutable.HashMap[ServiceInstance,
ArrayBuffer[CodeExecTaskExecutor]]
): Unit = {
if (null != serviceInstance) {
- val taskExecutorList =
engineConnExecutorCache.getOrDefault(serviceInstance, null)
- if (null != taskExecutorList) {
- taskExecutorList.foreach(executor =>
+ val executors = engineConnExecutorCache.getOrDefault(serviceInstance,
null)
+ if (null != executors) {
+ executors.foreach { executor =>
if
(executor.getEngineConnExecutor.getServiceInstance.equals(serviceInstance)) {
executor.getEngineConnExecutor.updateLastUpdateTime()
}
- )
+ }
} else {
logger.warn(
- s"EngineConnExecutor ${serviceInstance.toString} cannot be found in
engineConnExecutorCache"
+ "EngineConnExecutor {} cannot be found in engineConnExecutorCache",
+ serviceInstance.toString
)
}
}
diff --git
a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/physical/CodeLogicalUnitExecTask.scala
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/physical/CodeLogicalUnitExecTask.scala
index e7f98c5c6..86c1cad86 100644
---
a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/physical/CodeLogicalUnitExecTask.scala
+++
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/physical/CodeLogicalUnitExecTask.scala
@@ -114,7 +114,7 @@ class CodeLogicalUnitExecTask(parents: Array[ExecTask],
children: Array[ExecTask
response match {
case SubmitResponse(engineConnExecId) =>
codeExecutor.setEngineConnTaskId(engineConnExecId)
- codeExecTaskExecutorManager.addEngineConnTaskID(codeExecutor)
+ codeExecTaskExecutorManager.addEngineConnTaskInfo(codeExecutor)
val infoMap = new util.HashMap[String, Object]
infoMap.put(
TaskConstant.ENGINE_INSTANCE,
diff --git
a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/service/ComputationTaskExecutionReceiver.scala
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/service/ComputationTaskExecutionReceiver.scala
index f1e17703d..521205927 100644
---
a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/service/ComputationTaskExecutionReceiver.scala
+++
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/service/ComputationTaskExecutionReceiver.scala
@@ -45,38 +45,7 @@ class ComputationTaskExecutionReceiver extends
TaskExecutionReceiver with Loggin
@PostConstruct
private def init(): Unit = {
EngineConnMonitor.addEngineExecutorStatusMonitor(
- codeExecTaskExecutorManager.getAllInstanceToExecutorCache(),
- failedEngineServiceInstance => {
- val taskToExecutorCache =
codeExecTaskExecutorManager.getAllExecTaskToExecutorCache()
- val allExecutor = taskToExecutorCache.values().iterator()
- while (allExecutor.hasNext) {
- val executor = allExecutor.next()
- if (
-
executor.getEngineConnExecutor.getServiceInstance.equals(failedEngineServiceInstance)
- ) {
- val execTask = executor.getExecTask
- Utils.tryAndError {
- logger.warn(
- s"Will kill task ${execTask.getIDInfo()} because the engine
${executor.getEngineConnExecutor.getServiceInstance.toString} quited
unexpectedly."
- )
- val errLog = LogUtils.generateERROR(
- s"Your job : ${execTask.getIDInfo()} was failed because the
engine quitted unexpectedly(任务${execTask
- .getIDInfo()}失败," +
- s"原因是引擎意外退出,可能是复杂任务导致引擎退出,如OOM)."
- )
- val logEvent = TaskLogEvent(execTask, errLog)
- execTask.getPhysicalContext.pushLog(logEvent)
- val errorResponseEvent = TaskErrorResponseEvent(
- execTask,
- "task failed,Engine quitted
unexpectedly(任务运行失败原因是引擎意外退出,可能是复杂任务导致引擎退出,如OOM)."
- )
-
execTask.getPhysicalContext.broadcastSyncEvent(errorResponseEvent)
- val statusEvent = TaskStatusEvent(execTask,
ExecutionNodeStatus.Failed)
- execTask.getPhysicalContext.broadcastSyncEvent(statusEvent)
- }
- }
- }
- }
+ codeExecTaskExecutorManager.getAllInstanceToExecutorCache()
)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]