This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch dev-1.3.2
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git
The following commit(s) were added to refs/heads/dev-1.3.2 by this push:
new de122dc26 linkis-computation-orchestrator - add log of engine reusing
(#3852)
de122dc26 is described below
commit de122dc26767faed78d3cc80264a70cb8df470d3
Author: 成彬彬 <[email protected]>
AuthorDate: Tue Nov 22 11:53:07 2022 +0800
linkis-computation-orchestrator - add log of engine reusing (#3852)
* linkis-computation-orchestrator - add log of engine reusing
---
.../computation/physical/CodeLogicalUnitExecTask.scala | 6 ++++++
.../orchestrator/ecm/ComputationEngineConnManager.scala | 13 +++++++------
.../orchestrator/ecm/service/EngineConnExecutor.scala | 13 +++++++++++++
3 files changed, 26 insertions(+), 6 deletions(-)
diff --git
a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/physical/CodeLogicalUnitExecTask.scala
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/physical/CodeLogicalUnitExecTask.scala
index a741b3643..e7f98c5c6 100644
---
a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/physical/CodeLogicalUnitExecTask.scala
+++
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/physical/CodeLogicalUnitExecTask.scala
@@ -97,6 +97,12 @@ class CodeLogicalUnitExecTask(parents: Array[ExecTask],
children: Array[ExecTask
if (executor.isDefined && !isCanceled) {
val requestTask = toRequestTask
val codeExecutor = executor.get
+ val msg = if (codeExecutor.getEngineConnExecutor.isReuse()) {
+ s"Succeed to reuse ec :
${codeExecutor.getEngineConnExecutor.getServiceInstance}"
+ } else {
+ s"Succeed to create new ec :
${codeExecutor.getEngineConnExecutor.getServiceInstance}"
+ }
+ getPhysicalContext.pushLog(TaskLogEvent(this,
LogUtils.generateInfo(msg)))
val response =
Utils.tryCatch(codeExecutor.getEngineConnExecutor.execute(requestTask)) {
t: Throwable =>
logger.error(
diff --git
a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/ecm/ComputationEngineConnManager.scala
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/ecm/ComputationEngineConnManager.scala
index 29b4c14a8..2c97e36ce 100644
---
a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/ecm/ComputationEngineConnManager.scala
+++
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/ecm/ComputationEngineConnManager.scala
@@ -86,7 +86,7 @@ class ComputationEngineConnManager extends
AbstractEngineConnManager with Loggin
count = count - 1
val start = System.currentTimeMillis()
try {
- val engineNode = getEngineNodeAskManager(engineAskRequest, mark)
+ val (engineNode, reuse) = getEngineNodeAskManager(engineAskRequest,
mark)
if (null != engineNode) {
val engineConnExecutor =
if (
@@ -100,6 +100,7 @@ class ComputationEngineConnManager extends
AbstractEngineConnManager with Loggin
if (null != engineNode.getLabels) {
engineConnExecutor.setLabels(engineNode.getLabels.asScala.toList.toArray)
}
+ engineConnExecutor.setReuse(reuse)
return engineConnExecutor
}
} catch {
@@ -128,7 +129,7 @@ class ComputationEngineConnManager extends
AbstractEngineConnManager with Loggin
private def getEngineNodeAskManager(
engineAskRequest: EngineAskRequest,
mark: Mark
- ): EngineNode = {
+ ): (EngineNode, Boolean) = {
val response = Utils.tryCatch(getManagerSender().ask(engineAskRequest)) {
t: Throwable =>
val baseMsg = s"mark ${mark.getMarkId()} failed to ask linkis Manager
Can be retried "
ExceptionUtils.getRootCause(t) match {
@@ -144,8 +145,8 @@ class ComputationEngineConnManager extends
AbstractEngineConnManager with Loggin
}
response match {
case engineNode: EngineNode =>
- logger.debug("Succeed to get engineNode {} mark {}", engineNode: Any,
mark.getMarkId(): Any)
- engineNode
+ logger.debug(s"Succeed to reuse engineNode $engineNode mark
${mark.getMarkId()}")
+ (engineNode, true)
case EngineAskAsyncResponse(id, serviceInstance) =>
logger.info(
"{} received EngineAskAsyncResponse id: {} serviceInstance: {}",
@@ -160,7 +161,7 @@ class ComputationEngineConnManager extends
AbstractEngineConnManager with Loggin
"{} async id: {} success to async get EngineNode {}",
Array(mark.getMarkId(), id, engineNode): _*
)
- engineNode
+ (engineNode, false)
case EngineCreateError(id, exception, retry) =>
logger.debug(
"{} async id: {} Failed to async get EngineNode, {}",
@@ -184,7 +185,7 @@ class ComputationEngineConnManager extends
AbstractEngineConnManager with Loggin
mark.getMarkId(): Any,
engineAskRequest: Any
)
- null
+ (null, false)
}
}
diff --git
a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/ecm/service/EngineConnExecutor.scala
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/ecm/service/EngineConnExecutor.scala
index 10395dcf7..c0d225065 100644
---
a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/ecm/service/EngineConnExecutor.scala
+++
b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/ecm/service/EngineConnExecutor.scala
@@ -70,6 +70,10 @@ trait EngineConnExecutor extends Closeable {
def updateLastUpdateTime(): Unit
+ def isReuse(): Boolean
+
+ def setReuse(reuse: Boolean): EngineConnExecutor
+
override def equals(other: Any): Boolean = other match {
case that: EngineConnExecutor =>
(that canEqual this) &&
@@ -99,6 +103,8 @@ abstract class AbstractEngineConnExecutor extends
EngineConnExecutor with Loggin
private val runningTask: util.Map[String, RequestTask] =
new ConcurrentHashMap[String, RequestTask]()
+ private var reuse: Boolean = false
+
override def getLastUpdateTime(): Long = lastUpdateTime
override def updateLastUpdateTime(): Unit = lastUpdateTime =
System.currentTimeMillis()
@@ -123,4 +129,11 @@ abstract class AbstractEngineConnExecutor extends
EngineConnExecutor with Loggin
}
}
+ override def isReuse(): Boolean = reuse
+
+ override def setReuse(reuse: Boolean): EngineConnExecutor = {
+ this.reuse = reuse
+ this
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]