This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch release-1.3.1
in repository https://gitbox.apache.org/repos/asf/linkis.git
The following commit(s) were added to refs/heads/release-1.3.1 by this push:
new 93ba1ae09 Report the final exit status when EC exits (#4574)
93ba1ae09 is described below
commit 93ba1ae09c2fedd9207d0d0a95028434621ba9c8
Author: zhangwejun <[email protected]>
AuthorDate: Fri May 26 11:25:09 2023 +0800
Report the final exit status when EC exits (#4574)
* Report the final exit status when EC exits
* Report the final exit status when EC exits
---
.../engineconn/once/executor/OnceExecutor.scala | 1 +
.../execution/AccessibleEngineConnExecution.scala | 11 +++++-----
.../executor/service/AccessibleService.scala | 1 +
.../engine/DefaultEngineCreateService.scala | 2 ++
.../service/engine/DefaultEngineStopService.scala | 7 +++++-
.../manager/rm/service/ResourceManager.scala | 5 +++--
.../rm/service/impl/DefaultResourceManager.scala | 25 ++++++++++++++--------
.../protocol/engine/EngineConnReleaseRequest.java | 11 ++++++++++
8 files changed, 46 insertions(+), 17 deletions(-)
diff --git
a/linkis-computation-governance/linkis-engineconn/linkis-clustered-engineconn/linkis-once-engineconn/src/main/scala/org/apache/linkis/engineconn/once/executor/OnceExecutor.scala
b/linkis-computation-governance/linkis-engineconn/linkis-clustered-engineconn/linkis-once-engineconn/src/main/scala/org/apache/linkis/engineconn/once/executor/OnceExecutor.scala
index 67f8af0fa..dee4a885d 100644
---
a/linkis-computation-governance/linkis-engineconn/linkis-clustered-engineconn/linkis-once-engineconn/src/main/scala/org/apache/linkis/engineconn/once/executor/OnceExecutor.scala
+++
b/linkis-computation-governance/linkis-engineconn/linkis-clustered-engineconn/linkis-once-engineconn/src/main/scala/org/apache/linkis/engineconn/once/executor/OnceExecutor.scala
@@ -209,6 +209,7 @@ trait ManageableOnceExecutor extends AccessibleExecutor
with OnceExecutor with R
msg,
EngineConnObject.getEngineCreationContext.getTicketId
)
+ engineReleaseRequest.setNodeStatus(getStatus)
Utils.tryAndWarn(Thread.sleep(500))
logger.info("To send release request to linkis manager")
ManagerService.getManagerService.requestReleaseEngineConn(engineReleaseRequest)
diff --git
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/execution/AccessibleEngineConnExecution.scala
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/execution/AccessibleEngineConnExecution.scala
index fb5dbb645..581b08a5e 100644
---
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/execution/AccessibleEngineConnExecution.scala
+++
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/execution/AccessibleEngineConnExecution.scala
@@ -94,15 +94,16 @@ class AccessibleEngineConnExecution extends
EngineConnExecution with Logging {
)
return
}
+ val nodeStatus = accessibleExecutor.getStatus
if (NodeStatus.isCompleted(accessibleExecutor.getStatus)) {
logger.error(
s"${accessibleExecutor.getId} has completed with status
${accessibleExecutor.getStatus}, now stop it."
)
- requestManagerReleaseExecutor("Completed release")
+ requestManagerReleaseExecutor("Completed release", nodeStatus)
ShutdownHook.getShutdownHook.notifyStop()
} else if (accessibleExecutor.getStatus == NodeStatus.ShuttingDown) {
logger.warn(s"${accessibleExecutor.getId} is ShuttingDown...")
- requestManagerReleaseExecutor(" ShuttingDown release")
+ requestManagerReleaseExecutor(" ShuttingDown release", nodeStatus)
ShutdownHook.getShutdownHook.notifyStop()
} else if (
maxFreeTime > 0 && (NodeStatus.Unlock.equals(
@@ -118,7 +119,7 @@ class AccessibleEngineConnExecution extends
EngineConnExecution with Logging {
s"${accessibleExecutor.getId} has not been used for
$maxFreeTimeStr, now try to shutdown it."
)
accessibleExecutor.tryShutdown()
- requestManagerReleaseExecutor(" idle release")
+ requestManagerReleaseExecutor(" idle release", nodeStatus)
ShutdownHook.getShutdownHook.notifyStop()
}
@@ -131,14 +132,14 @@ class AccessibleEngineConnExecution extends
EngineConnExecution with Logging {
)
}
- def requestManagerReleaseExecutor(msg: String): Unit = {
+ def requestManagerReleaseExecutor(msg: String, nodeStatus: NodeStatus): Unit
= {
val engineReleaseRequest = new EngineConnReleaseRequest(
Sender.getThisServiceInstance,
Utils.getJvmUser,
msg,
EngineConnObject.getEngineCreationContext.getTicketId
)
- Utils.tryAndWarn(Thread.sleep(100))
+ engineReleaseRequest.setNodeStatus(nodeStatus)
logger.info("To send release request to linkis manager")
ManagerService.getManagerService.requestReleaseEngineConn(engineReleaseRequest)
}
diff --git
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/AccessibleService.scala
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/AccessibleService.scala
index f103a6635..604a9a1d2 100644
---
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/AccessibleService.scala
+++
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/AccessibleService.scala
@@ -32,6 +32,7 @@ trait AccessibleService extends ExecutorStatusListener {
def dealEngineStopRequest(engineSuicideRequest: EngineSuicideRequest,
sender: Sender): Unit
+ @deprecated
def requestManagerReleaseExecutor(msg: String): Unit
def dealRequestNodeStatus(requestNodeStatus: RequestNodeStatus):
ResponseNodeStatus
diff --git
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.scala
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.scala
index 3c8b3279a..421c92a71 100644
---
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.scala
+++
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.scala
@@ -196,6 +196,7 @@ class DefaultEngineCreateService
failedEcNode.getLabels.addAll(
LabelUtils.distinctLabel(labelFilter.choseEngineLabel(labelList),
emNode.getLabels)
)
+ failedEcNode.setNodeStatus(NodeStatus.Failed)
engineStopService.engineConnInfoClear(failedEcNode)
}
throw t
@@ -220,6 +221,7 @@ class DefaultEngineCreateService
failedEcNode.getLabels.addAll(
LabelUtils.distinctLabel(labelFilter.choseEngineLabel(labelList),
emNode.getLabels)
)
+ failedEcNode.setNodeStatus(NodeStatus.Failed)
engineStopService.engineConnInfoClear(failedEcNode)
}
throw new LinkisRetryException(
diff --git
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineStopService.scala
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineStopService.scala
index 757fb393c..1dc4d92d2 100644
---
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineStopService.scala
+++
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineStopService.scala
@@ -20,6 +20,7 @@ package org.apache.linkis.manager.am.service.engine
import org.apache.linkis.common.utils.{Logging, Utils}
import org.apache.linkis.governance.common.conf.GovernanceCommonConf
import org.apache.linkis.manager.am.conf.AMConfiguration
+import org.apache.linkis.manager.common.entity.enumeration.NodeStatus
import org.apache.linkis.manager.common.entity.node.EngineNode
import org.apache.linkis.manager.common.exception.RMErrorException
import org.apache.linkis.manager.common.protocol.engine.{
@@ -78,6 +79,9 @@ class DefaultEngineStopService extends AbstractEngineService
with EngineStopServ
logger.info(s"Finished to kill engine invoke enginePointer
${node.getServiceInstance}")
}(s"Failed to stop engine ${node.getServiceInstance}")
node.setLabels(nodeLabelService.getNodeLabels(engineStopRequest.getServiceInstance))
+ if (null == node.getNodeStatus) {
+ node.setNodeStatus(NodeStatus.ShuttingDown)
+ }
engineConnInfoClear(node)
logger.info(
s" user ${engineStopRequest.getUser} finished to stop engine
${engineStopRequest.getServiceInstance}"
@@ -92,7 +96,7 @@ class DefaultEngineStopService extends AbstractEngineService
with EngineStopServ
logger.info(s"Start to clear ec info $ecNode")
// 1. to clear engine resource
Utils.tryCatch {
- resourceManager.resourceReleased(ecNode.getLabels)
+ resourceManager.resourceReleased(ecNode)
} {
case exception: RMErrorException =>
if (exception.getErrCode !=
RMErrorCode.LABEL_RESOURCE_NOT_FOUND.getCode) {
@@ -136,6 +140,7 @@ class DefaultEngineStopService extends
AbstractEngineService with EngineStopServ
s"Send stop engine request
${engineConnReleaseRequest.getServiceInstance.toString}"
)
engineNode.setLabels(nodeLabelService.getNodeLabels(engineNode.getServiceInstance))
+ engineNode.setNodeStatus(engineConnReleaseRequest.getNodeStatus)
engineConnInfoClear(engineNode)
} else {
logger.warn(
diff --git
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/ResourceManager.scala
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/ResourceManager.scala
index d22ad64f8..78e414d43 100644
---
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/ResourceManager.scala
+++
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/ResourceManager.scala
@@ -18,6 +18,7 @@
package org.apache.linkis.manager.rm.service
import org.apache.linkis.common.ServiceInstance
+import org.apache.linkis.manager.common.entity.node.EngineNode
import org.apache.linkis.manager.common.entity.resource.NodeResource
import org.apache.linkis.manager.label.entity.Label
import org.apache.linkis.manager.rm.{ResourceInfo, ResultResource}
@@ -82,9 +83,9 @@ abstract class ResourceManager {
/**
* Method called when the resource usage is released 当资源使用完成释放后,调用的方法
*
- * @param labels
+ * @param ecNode
*/
- def resourceReleased(labels: util.List[Label[_]]): Unit
+ def resourceReleased(ecNode: EngineNode): Unit
/**
* If the IP and port are empty, return the resource status of all modules
of a module * Return
diff --git
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/impl/DefaultResourceManager.scala
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/impl/DefaultResourceManager.scala
index 8358c21ec..e720e4733 100644
---
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/impl/DefaultResourceManager.scala
+++
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/impl/DefaultResourceManager.scala
@@ -23,7 +23,7 @@ import
org.apache.linkis.governance.common.conf.GovernanceCommonConf
import org.apache.linkis.manager.am.service.engine.EngineStopService
import org.apache.linkis.manager.common.conf.RMConfiguration
import org.apache.linkis.manager.common.entity.enumeration.NodeStatus
-import org.apache.linkis.manager.common.entity.node.{AMEMNode, AMEngineNode,
InfoRMNode}
+import org.apache.linkis.manager.common.entity.node.{AMEMNode, AMEngineNode,
EngineNode, InfoRMNode}
import org.apache.linkis.manager.common.entity.persistence.{
PersistenceLabel,
PersistenceLock,
@@ -211,6 +211,7 @@ class DefaultResourceManager extends ResourceManager with
Logging with Initializ
ecNodes.foreach { engineNode =>
Utils.tryAndWarn {
engineNode.setLabels(nodeLabelService.getNodeLabels(engineNode.getServiceInstance))
+ engineNode.setNodeStatus(NodeStatus.Failed)
engineStopService.engineConnInfoClear(engineNode)
}
}
@@ -368,9 +369,6 @@ class DefaultResourceManager extends ResourceManager with
Logging with Initializ
* 当资源被实例化后,返回实际占用的资源总量
*
* @param labels
- * In general, resourceReleased will release the resources occupied by the
user, but if the
- * process that uses the resource does not have time to call the
resourceReleased method to die,
- * you need to unregister to release the resource.
* @param usedResource
*/
override def resourceUsed(labels: util.List[Label[_]], usedResource:
NodeResource): Unit = {
@@ -610,10 +608,10 @@ class DefaultResourceManager extends ResourceManager with
Logging with Initializ
/**
* Method called when the resource usage is released 当资源使用完成释放后,调用的方法
*
- * @param labels
+ * @param ecNode
*/
- override def resourceReleased(labels: util.List[Label[_]]): Unit = {
- val labelContainer = labelResourceService.enrichLabels(labels)
+ override def resourceReleased(ecNode: EngineNode): Unit = {
+ val labelContainer = labelResourceService.enrichLabels(ecNode.getLabels)
if (null == labelContainer.getEngineInstanceLabel) {
throw new RMErrorException(
RMErrorCode.LABEL_RESOURCE_NOT_FOUND.getCode,
@@ -638,7 +636,12 @@ class DefaultResourceManager extends ResourceManager with
Logging with Initializ
logger.info(
s"resourceRelease
ready:${labelContainer.getEngineInstanceLabel.getServiceInstance},current node
resource${usedResource}"
)
- val status = getNodeStatus(labelContainer.getEngineInstanceLabel)
+
+ val status = if (null == ecNode.getNodeStatus) {
+ getNodeStatus(labelContainer.getEngineInstanceLabel)
+ } else {
+ ecNode.getNodeStatus
+ }
labelContainer.getResourceLabels.asScala
.filter(!_.isInstanceOf[EngineInstanceLabel])
@@ -844,7 +847,11 @@ class DefaultResourceManager extends ResourceManager with
Logging with Initializ
logger.warn(
s"serviceInstance ${engineInstanceLabel.getServiceInstance}
lock resource timeout, clear resource"
)
- resourceReleased(labels)
+ val ecNode = new AMEngineNode()
+ ecNode.setServiceInstance(engineInstanceLabel.getServiceInstance)
+ ecNode.setNodeStatus(NodeStatus.Failed)
+ ecNode.setLabels(labels)
+ resourceReleased(ecNode)
case _ =>
}
}
diff --git
a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/protocol/engine/EngineConnReleaseRequest.java
b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/protocol/engine/EngineConnReleaseRequest.java
index b7c8172b3..31e269d49 100644
---
a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/protocol/engine/EngineConnReleaseRequest.java
+++
b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/protocol/engine/EngineConnReleaseRequest.java
@@ -18,6 +18,7 @@
package org.apache.linkis.manager.common.protocol.engine;
import org.apache.linkis.common.ServiceInstance;
+import org.apache.linkis.manager.common.entity.enumeration.NodeStatus;
public class EngineConnReleaseRequest implements EngineRequest {
@@ -27,6 +28,8 @@ public class EngineConnReleaseRequest implements
EngineRequest {
private String msg;
+ private NodeStatus nodeStatus;
+
public String getTicketId() {
return ticketId;
}
@@ -71,4 +74,12 @@ public class EngineConnReleaseRequest implements
EngineRequest {
public void setMsg(String msg) {
this.msg = msg;
}
+
+ public NodeStatus getNodeStatus() {
+ return nodeStatus;
+ }
+
+ public void setNodeStatus(NodeStatus nodeStatus) {
+ this.nodeStatus = nodeStatus;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]