This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/linkis.git

commit 8c1b15d81eeff5cf488a8d1fd4fc400b80b924f5
Author: peacewong <[email protected]>
AuthorDate: Tue Oct 10 20:34:15 2023 +0800

    Added logs for the ask engine stage
---
 .../execute/DefaultCodeExecTaskExecutorManager.scala  |  3 +--
 .../physical/CodeLogicalUnitExecTask.scala            |  1 -
 .../ecm/ComputationEngineConnManager.scala            | 19 ++++++++++++++++---
 .../linkis/orchestrator/ecm/EngineConnManager.scala   | 16 ++++++++++++----
 .../ecm/LoadBalanceLabelEngineConnManager.scala       |  9 ++++++---
 5 files changed, 35 insertions(+), 13 deletions(-)

diff --git 
a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/execute/DefaultCodeExecTaskExecutorManager.scala
 
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/execute/DefaultCodeExecTaskExecutorManager.scala
index 8f95172fb..c370c6611 100644
--- 
a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/execute/DefaultCodeExecTaskExecutorManager.scala
+++ 
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/execute/DefaultCodeExecTaskExecutorManager.scala
@@ -41,7 +41,6 @@ import org.apache.commons.lang3.StringUtils
 import java.util
 
 import scala.collection.JavaConverters._
-import scala.collection.mutable
 import scala.concurrent.duration.Duration
 
 class DefaultCodeExecTaskExecutorManager extends CodeExecTaskExecutorManager 
with Logging {
@@ -132,7 +131,7 @@ class DefaultCodeExecTaskExecutorManager extends 
CodeExecTaskExecutorManager wit
           .getIDInfo()} mark id is ${mark.getMarkId()}, it may take several 
seconds, please wait")
       )
     )
-    val engineConnExecutor = 
engineConnManager.getAvailableEngineConnExecutor(mark)
+    val engineConnExecutor = 
engineConnManager.getAvailableEngineConnExecutor(mark, execTask)
     if (null == engineConnExecutor) {
       return null
     }
diff --git 
a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/physical/CodeLogicalUnitExecTask.scala
 
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/physical/CodeLogicalUnitExecTask.scala
index 508b6fb8f..82e75c0bd 100644
--- 
a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/physical/CodeLogicalUnitExecTask.scala
+++ 
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/physical/CodeLogicalUnitExecTask.scala
@@ -102,7 +102,6 @@ class CodeLogicalUnitExecTask(parents: Array[ExecTask], 
children: Array[ExecTask
     if (executor.isDefined && !isCanceled) {
       val requestTask = toRequestTask
       val codeExecutor = executor.get
-
       val msg = if (codeExecutor.getEngineConnExecutor.isReuse()) {
         s"Succeed to reuse ec : 
${codeExecutor.getEngineConnExecutor.getServiceInstance}"
       } else {
diff --git 
a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/ecm/ComputationEngineConnManager.scala
 
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/ecm/ComputationEngineConnManager.scala
index c937deed6..1ae1d8459 100644
--- 
a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/ecm/ComputationEngineConnManager.scala
+++ 
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/ecm/ComputationEngineConnManager.scala
@@ -19,6 +19,7 @@ package org.apache.linkis.orchestrator.ecm
 
 import org.apache.linkis.common.ServiceInstance
 import org.apache.linkis.common.exception.LinkisRetryException
+import org.apache.linkis.common.log.LogUtils
 import org.apache.linkis.common.utils.{ByteTimeUtils, Logging, Utils}
 import org.apache.linkis.governance.common.conf.GovernanceCommonConf
 import org.apache.linkis.manager.common.entity.node.EngineNode
@@ -29,6 +30,7 @@ import org.apache.linkis.manager.common.protocol.engine.{
   EngineCreateSuccess
 }
 import org.apache.linkis.manager.label.constant.LabelKeyConstant
+import 
org.apache.linkis.orchestrator.computation.physical.CodeLogicalUnitExecTask
 import org.apache.linkis.orchestrator.ecm.cache.EngineAsyncResponseCache
 import org.apache.linkis.orchestrator.ecm.conf.ECMPluginConf
 import org.apache.linkis.orchestrator.ecm.entity.{DefaultMark, Mark, MarkReq, 
Policy}
@@ -38,6 +40,7 @@ import org.apache.linkis.orchestrator.ecm.service.impl.{
   ComputationConcurrentEngineConnExecutor,
   ComputationEngineConnExecutor
 }
+import org.apache.linkis.orchestrator.listener.task.TaskLogEvent
 import org.apache.linkis.rpc.Sender
 
 import org.apache.commons.lang3.exception.ExceptionUtils
@@ -77,7 +80,8 @@ class ComputationEngineConnManager extends 
AbstractEngineConnManager with Loggin
 
   override protected def askEngineConnExecutor(
       engineAskRequest: EngineAskRequest,
-      mark: Mark
+      mark: Mark,
+      execTask: CodeLogicalUnitExecTask
   ): EngineConnExecutor = {
     engineAskRequest.setTimeOut(getEngineConnApplyTime)
     var count = getEngineConnApplyAttempts()
@@ -86,7 +90,8 @@ class ComputationEngineConnManager extends 
AbstractEngineConnManager with Loggin
       count = count - 1
       val start = System.currentTimeMillis()
       try {
-        val (engineNode, reuse) = getEngineNodeAskManager(engineAskRequest, 
mark)
+        val (engineNode, reuse) =
+          getEngineNodeAskManager(engineAskRequest, mark, execTask)
         if (null != engineNode) {
           val engineConnExecutor =
             if (
@@ -110,6 +115,9 @@ class ComputationEngineConnManager extends 
AbstractEngineConnManager with Loggin
             s"${mark.getMarkId()} Failed to askEngineAskRequest time taken 
($taken), ${t.getMessage}"
           )
           retryException = t
+          // add isCrossClusterRetryException flag
+          engineAskRequest.getProperties.put("isCrossClusterRetryException", 
"true")
+
         case t: Throwable =>
           val taken = 
ByteTimeUtils.msDurationToString(System.currentTimeMillis - start)
           logger.warn(s"${mark.getMarkId()} Failed to askEngineAskRequest time 
taken ($taken)")
@@ -128,7 +136,8 @@ class ComputationEngineConnManager extends 
AbstractEngineConnManager with Loggin
 
   private def getEngineNodeAskManager(
       engineAskRequest: EngineAskRequest,
-      mark: Mark
+      mark: Mark,
+      execTask: CodeLogicalUnitExecTask
   ): (EngineNode, Boolean) = {
     val response = Utils.tryCatch(getManagerSender().ask(engineAskRequest)) { 
t: Throwable =>
       val baseMsg = s"mark ${mark.getMarkId()}  failed to ask linkis Manager 
Can be retried "
@@ -143,6 +152,7 @@ class ComputationEngineConnManager extends 
AbstractEngineConnManager with Loggin
           throw t
       }
     }
+
     response match {
       case engineNode: EngineNode =>
         logger.debug(s"Succeed to reuse engineNode $engineNode mark 
${mark.getMarkId()}")
@@ -156,6 +166,9 @@ class ComputationEngineConnManager extends 
AbstractEngineConnManager with Loggin
             engineAskAsyncResponse.getManagerInstance
           ): _*
         )
+        execTask.getPhysicalContext.pushLog(
+          TaskLogEvent(execTask, LogUtils.generateInfo(s"Request 
LinkisManager:${response}"))
+        )
         cacheMap.getAndRemove(
           engineAskAsyncResponse.getId,
           Duration(engineAskRequest.getTimeOut + 100000, TimeUnit.MILLISECONDS)
diff --git 
a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/ecm/EngineConnManager.scala
 
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/ecm/EngineConnManager.scala
index de996a353..416d1363e 100644
--- 
a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/ecm/EngineConnManager.scala
+++ 
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/ecm/EngineConnManager.scala
@@ -21,6 +21,7 @@ import org.apache.linkis.common.ServiceInstance
 import org.apache.linkis.common.utils.{Logging, Utils}
 import org.apache.linkis.manager.common.protocol.engine.EngineAskRequest
 import org.apache.linkis.manager.label.constant.LabelKeyConstant
+import 
org.apache.linkis.orchestrator.computation.physical.CodeLogicalUnitExecTask
 import org.apache.linkis.orchestrator.ecm.conf.ECMPluginConf
 import org.apache.linkis.orchestrator.ecm.entity.{Mark, MarkReq, Policy}
 import org.apache.linkis.orchestrator.ecm.exception.ECMPluginErrorException
@@ -59,7 +60,10 @@ trait EngineConnManager {
    * @param mark
    * @return
    */
-  def getAvailableEngineConnExecutor(mark: Mark): EngineConnExecutor
+  def getAvailableEngineConnExecutor(
+      mark: Mark,
+      execTask: CodeLogicalUnitExecTask
+  ): EngineConnExecutor
 
   /**
    * Remove the engineConn related to the Mark Release lock and other 
information
@@ -121,7 +125,10 @@ abstract class AbstractEngineConnManager extends 
EngineConnManager with Logging
 
   override def getMarkCache(): util.Map[Mark, util.List[ServiceInstance]] = 
markCache
 
-  override def getAvailableEngineConnExecutor(mark: Mark): EngineConnExecutor 
= {
+  override def getAvailableEngineConnExecutor(
+      mark: Mark,
+      execTask: CodeLogicalUnitExecTask
+  ): EngineConnExecutor = {
     logger.info(s"mark ${mark.getMarkId()} start to  
getAvailableEngineConnExecutor")
     if (null != mark) {
       tryReuseEngineConnExecutor(mark) match {
@@ -129,7 +136,7 @@ abstract class AbstractEngineConnManager extends 
EngineConnManager with Logging
         case None =>
       }
       val engineConnExecutor =
-        askEngineConnExecutor(mark.getMarkReq.createEngineConnAskReq(), mark)
+        askEngineConnExecutor(mark.getMarkReq.createEngineConnAskReq(), mark, 
execTask)
       engineConnExecutor.useEngineConn
       saveToMarkCache(mark, engineConnExecutor)
       logger.debug(
@@ -233,7 +240,8 @@ abstract class AbstractEngineConnManager extends 
EngineConnManager with Logging
 
   protected def askEngineConnExecutor(
       engineAskRequest: EngineAskRequest,
-      mark: Mark
+      mark: Mark,
+      execTask: CodeLogicalUnitExecTask
   ): EngineConnExecutor
 
   override def releaseMark(mark: Mark): Unit = {
diff --git 
a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/ecm/LoadBalanceLabelEngineConnManager.scala
 
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/ecm/LoadBalanceLabelEngineConnManager.scala
index 622c8813b..d36b548cd 100644
--- 
a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/ecm/LoadBalanceLabelEngineConnManager.scala
+++ 
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/ecm/LoadBalanceLabelEngineConnManager.scala
@@ -22,6 +22,7 @@ import org.apache.linkis.common.utils.Logging
 import org.apache.linkis.manager.label.constant.LabelKeyConstant
 import org.apache.linkis.manager.label.entity.engine.ReuseExclusionLabel
 import org.apache.linkis.manager.label.entity.entrance.{BindEngineLabel, 
LoadBalanceLabel}
+import 
org.apache.linkis.orchestrator.computation.physical.CodeLogicalUnitExecTask
 import org.apache.linkis.orchestrator.ecm.conf.ECMPluginConf
 import org.apache.linkis.orchestrator.ecm.entity._
 import org.apache.linkis.orchestrator.ecm.exception.ECMPluginErrorException
@@ -153,8 +154,10 @@ class LoadBalanceLabelEngineConnManager extends 
ComputationEngineConnManager wit
     }
   }
 
-  override def getAvailableEngineConnExecutor(mark: Mark): EngineConnExecutor 
= {
-
+  override def getAvailableEngineConnExecutor(
+      mark: Mark,
+      execTask: CodeLogicalUnitExecTask
+  ): EngineConnExecutor = {
     if (null != mark && getMarkCache().containsKey(mark)) {
       tryReuseEngineConnExecutor(mark) match {
         case Some(engineConnExecutor) =>
@@ -174,7 +177,7 @@ class LoadBalanceLabelEngineConnManager extends 
ComputationEngineConnManager wit
           reuseExclusionLabel.getValue
         )
       }
-      val engineConnExecutor = askEngineConnExecutor(engineConnAskReq, mark)
+      val engineConnExecutor = askEngineConnExecutor(engineConnAskReq, mark, 
execTask)
       saveToMarkCache(mark, engineConnExecutor)
       logger.debug(
         s"mark ${mark.getMarkId()} Finished to  getAvailableEngineConnExecutor 
by create"


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to