This is an automated email from the ASF dual-hosted git repository. peacewong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/linkis.git
commit 8c1b15d81eeff5cf488a8d1fd4fc400b80b924f5 Author: peacewong <[email protected]> AuthorDate: Tue Oct 10 20:34:15 2023 +0800 Added logs for the ask engine stage --- .../execute/DefaultCodeExecTaskExecutorManager.scala | 3 +-- .../physical/CodeLogicalUnitExecTask.scala | 1 - .../ecm/ComputationEngineConnManager.scala | 19 ++++++++++++++++--- .../linkis/orchestrator/ecm/EngineConnManager.scala | 16 ++++++++++++---- .../ecm/LoadBalanceLabelEngineConnManager.scala | 9 ++++++--- 5 files changed, 35 insertions(+), 13 deletions(-) diff --git a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/execute/DefaultCodeExecTaskExecutorManager.scala b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/execute/DefaultCodeExecTaskExecutorManager.scala index 8f95172fb..c370c6611 100644 --- a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/execute/DefaultCodeExecTaskExecutorManager.scala +++ b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/execute/DefaultCodeExecTaskExecutorManager.scala @@ -41,7 +41,6 @@ import org.apache.commons.lang3.StringUtils import java.util import scala.collection.JavaConverters._ -import scala.collection.mutable import scala.concurrent.duration.Duration class DefaultCodeExecTaskExecutorManager extends CodeExecTaskExecutorManager with Logging { @@ -132,7 +131,7 @@ class DefaultCodeExecTaskExecutorManager extends CodeExecTaskExecutorManager wit .getIDInfo()} mark id is ${mark.getMarkId()}, it may take several seconds, please wait") ) ) - val engineConnExecutor = engineConnManager.getAvailableEngineConnExecutor(mark) + val engineConnExecutor = engineConnManager.getAvailableEngineConnExecutor(mark, execTask) if (null == engineConnExecutor) { return null } diff --git a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/physical/CodeLogicalUnitExecTask.scala b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/physical/CodeLogicalUnitExecTask.scala index 508b6fb8f..82e75c0bd 100644 --- a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/physical/CodeLogicalUnitExecTask.scala +++ b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/physical/CodeLogicalUnitExecTask.scala @@ -102,7 +102,6 @@ class CodeLogicalUnitExecTask(parents: Array[ExecTask], children: Array[ExecTask if (executor.isDefined && !isCanceled) { val requestTask = toRequestTask val codeExecutor = executor.get - val msg = if (codeExecutor.getEngineConnExecutor.isReuse()) { s"Succeed to reuse ec : ${codeExecutor.getEngineConnExecutor.getServiceInstance}" } else { diff --git a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/ecm/ComputationEngineConnManager.scala b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/ecm/ComputationEngineConnManager.scala index c937deed6..1ae1d8459 100644 --- a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/ecm/ComputationEngineConnManager.scala +++ b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/ecm/ComputationEngineConnManager.scala @@ -19,6 +19,7 @@ package org.apache.linkis.orchestrator.ecm import org.apache.linkis.common.ServiceInstance import org.apache.linkis.common.exception.LinkisRetryException +import org.apache.linkis.common.log.LogUtils import org.apache.linkis.common.utils.{ByteTimeUtils, Logging, Utils} import org.apache.linkis.governance.common.conf.GovernanceCommonConf import org.apache.linkis.manager.common.entity.node.EngineNode @@ -29,6 +30,7 @@ import org.apache.linkis.manager.common.protocol.engine.{ EngineCreateSuccess } import org.apache.linkis.manager.label.constant.LabelKeyConstant +import org.apache.linkis.orchestrator.computation.physical.CodeLogicalUnitExecTask import org.apache.linkis.orchestrator.ecm.cache.EngineAsyncResponseCache import org.apache.linkis.orchestrator.ecm.conf.ECMPluginConf import org.apache.linkis.orchestrator.ecm.entity.{DefaultMark, Mark, MarkReq, Policy} @@ -38,6 +40,7 @@ import org.apache.linkis.orchestrator.ecm.service.impl.{ ComputationConcurrentEngineConnExecutor, ComputationEngineConnExecutor } +import org.apache.linkis.orchestrator.listener.task.TaskLogEvent import org.apache.linkis.rpc.Sender import org.apache.commons.lang3.exception.ExceptionUtils @@ -77,7 +80,8 @@ class ComputationEngineConnManager extends AbstractEngineConnManager with Loggin override protected def askEngineConnExecutor( engineAskRequest: EngineAskRequest, - mark: Mark + mark: Mark, + execTask: CodeLogicalUnitExecTask ): EngineConnExecutor = { engineAskRequest.setTimeOut(getEngineConnApplyTime) var count = getEngineConnApplyAttempts() @@ -86,7 +90,8 @@ class ComputationEngineConnManager extends AbstractEngineConnManager with Loggin count = count - 1 val start = System.currentTimeMillis() try { - val (engineNode, reuse) = getEngineNodeAskManager(engineAskRequest, mark) + val (engineNode, reuse) = + getEngineNodeAskManager(engineAskRequest, mark, execTask) if (null != engineNode) { val engineConnExecutor = if ( @@ -110,6 +115,9 @@ class ComputationEngineConnManager extends AbstractEngineConnManager with Loggin s"${mark.getMarkId()} Failed to askEngineAskRequest time taken ($taken), ${t.getMessage}" ) retryException = t + // add isCrossClusterRetryException flag + engineAskRequest.getProperties.put("isCrossClusterRetryException", "true") + case t: Throwable => val taken = ByteTimeUtils.msDurationToString(System.currentTimeMillis - start) logger.warn(s"${mark.getMarkId()} Failed to askEngineAskRequest time taken ($taken)") @@ -128,7 +136,8 @@ class ComputationEngineConnManager extends AbstractEngineConnManager with Loggin private def getEngineNodeAskManager( engineAskRequest: EngineAskRequest, - mark: Mark + mark: Mark, + execTask: CodeLogicalUnitExecTask ): (EngineNode, Boolean) = { val response = Utils.tryCatch(getManagerSender().ask(engineAskRequest)) { t: Throwable => val baseMsg = s"mark ${mark.getMarkId()} failed to ask linkis Manager Can be retried " @@ -143,6 +152,7 @@ class ComputationEngineConnManager extends AbstractEngineConnManager with Loggin throw t } } + response match { case engineNode: EngineNode => logger.debug(s"Succeed to reuse engineNode $engineNode mark ${mark.getMarkId()}") @@ -156,6 +166,9 @@ class ComputationEngineConnManager extends AbstractEngineConnManager with Loggin engineAskAsyncResponse.getManagerInstance ): _* ) + execTask.getPhysicalContext.pushLog( + TaskLogEvent(execTask, LogUtils.generateInfo(s"Request LinkisManager:${response}")) + ) cacheMap.getAndRemove( engineAskAsyncResponse.getId, Duration(engineAskRequest.getTimeOut + 100000, TimeUnit.MILLISECONDS) diff --git a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/ecm/EngineConnManager.scala b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/ecm/EngineConnManager.scala index de996a353..416d1363e 100644 --- a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/ecm/EngineConnManager.scala +++ b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/ecm/EngineConnManager.scala @@ -21,6 +21,7 @@ import org.apache.linkis.common.ServiceInstance import org.apache.linkis.common.utils.{Logging, Utils} import org.apache.linkis.manager.common.protocol.engine.EngineAskRequest import org.apache.linkis.manager.label.constant.LabelKeyConstant +import org.apache.linkis.orchestrator.computation.physical.CodeLogicalUnitExecTask import org.apache.linkis.orchestrator.ecm.conf.ECMPluginConf import org.apache.linkis.orchestrator.ecm.entity.{Mark, MarkReq, Policy} import org.apache.linkis.orchestrator.ecm.exception.ECMPluginErrorException @@ -59,7 +60,10 @@ trait EngineConnManager { * @param mark * @return */ - def getAvailableEngineConnExecutor(mark: Mark): EngineConnExecutor + def getAvailableEngineConnExecutor( + mark: Mark, + execTask: CodeLogicalUnitExecTask + ): EngineConnExecutor /** * Remove the engineConn related to the Mark Release lock and other information @@ -121,7 +125,10 @@ abstract class AbstractEngineConnManager extends EngineConnManager with Logging override def getMarkCache(): util.Map[Mark, util.List[ServiceInstance]] = markCache - override def getAvailableEngineConnExecutor(mark: Mark): EngineConnExecutor = { + override def getAvailableEngineConnExecutor( + mark: Mark, + execTask: CodeLogicalUnitExecTask + ): EngineConnExecutor = { logger.info(s"mark ${mark.getMarkId()} start to getAvailableEngineConnExecutor") if (null != mark) { tryReuseEngineConnExecutor(mark) match { @@ -129,7 +136,7 @@ abstract class AbstractEngineConnManager extends EngineConnManager with Logging case None => } val engineConnExecutor = - askEngineConnExecutor(mark.getMarkReq.createEngineConnAskReq(), mark) + askEngineConnExecutor(mark.getMarkReq.createEngineConnAskReq(), mark, execTask) engineConnExecutor.useEngineConn saveToMarkCache(mark, engineConnExecutor) logger.debug( @@ -233,7 +240,8 @@ abstract class AbstractEngineConnManager extends EngineConnManager with Logging protected def askEngineConnExecutor( engineAskRequest: EngineAskRequest, - mark: Mark + mark: Mark, + execTask: CodeLogicalUnitExecTask ): EngineConnExecutor override def releaseMark(mark: Mark): Unit = { diff --git a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/ecm/LoadBalanceLabelEngineConnManager.scala b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/ecm/LoadBalanceLabelEngineConnManager.scala index 622c8813b..d36b548cd 100644 --- a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/ecm/LoadBalanceLabelEngineConnManager.scala +++ b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/ecm/LoadBalanceLabelEngineConnManager.scala @@ -22,6 +22,7 @@ import org.apache.linkis.common.utils.Logging import org.apache.linkis.manager.label.constant.LabelKeyConstant import org.apache.linkis.manager.label.entity.engine.ReuseExclusionLabel import org.apache.linkis.manager.label.entity.entrance.{BindEngineLabel, LoadBalanceLabel} +import org.apache.linkis.orchestrator.computation.physical.CodeLogicalUnitExecTask import org.apache.linkis.orchestrator.ecm.conf.ECMPluginConf import org.apache.linkis.orchestrator.ecm.entity._ import org.apache.linkis.orchestrator.ecm.exception.ECMPluginErrorException @@ -153,8 +154,10 @@ class LoadBalanceLabelEngineConnManager extends ComputationEngineConnManager wit } } - override def getAvailableEngineConnExecutor(mark: Mark): EngineConnExecutor = { - + override def getAvailableEngineConnExecutor( + mark: Mark, + execTask: CodeLogicalUnitExecTask + ): EngineConnExecutor = { if (null != mark && getMarkCache().containsKey(mark)) { tryReuseEngineConnExecutor(mark) match { case Some(engineConnExecutor) => @@ -174,7 +177,7 @@ class LoadBalanceLabelEngineConnManager extends ComputationEngineConnManager wit reuseExclusionLabel.getValue ) } - val engineConnExecutor = askEngineConnExecutor(engineConnAskReq, mark) + val engineConnExecutor = askEngineConnExecutor(engineConnAskReq, mark, execTask) saveToMarkCache(mark, engineConnExecutor) logger.debug( s"mark ${mark.getMarkId()} Finished to getAvailableEngineConnExecutor by create" --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
