This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch release-1.3.1
in repository https://gitbox.apache.org/repos/asf/linkis.git


The following commit(s) were added to refs/heads/release-1.3.1 by this push:
     new 93ba1ae09 Report the final exit status when EC exits (#4574)
93ba1ae09 is described below

commit 93ba1ae09c2fedd9207d0d0a95028434621ba9c8
Author: zhangwejun <[email protected]>
AuthorDate: Fri May 26 11:25:09 2023 +0800

    Report the final exit status when EC exits (#4574)
    
    * Report the final exit status when EC exits
    
    * Report the final exit status when EC exits
---
 .../engineconn/once/executor/OnceExecutor.scala    |  1 +
 .../execution/AccessibleEngineConnExecution.scala  | 11 +++++-----
 .../executor/service/AccessibleService.scala       |  1 +
 .../engine/DefaultEngineCreateService.scala        |  2 ++
 .../service/engine/DefaultEngineStopService.scala  |  7 +++++-
 .../manager/rm/service/ResourceManager.scala       |  5 +++--
 .../rm/service/impl/DefaultResourceManager.scala   | 25 ++++++++++++++--------
 .../protocol/engine/EngineConnReleaseRequest.java  | 11 ++++++++++
 8 files changed, 46 insertions(+), 17 deletions(-)

diff --git 
a/linkis-computation-governance/linkis-engineconn/linkis-clustered-engineconn/linkis-once-engineconn/src/main/scala/org/apache/linkis/engineconn/once/executor/OnceExecutor.scala
 
b/linkis-computation-governance/linkis-engineconn/linkis-clustered-engineconn/linkis-once-engineconn/src/main/scala/org/apache/linkis/engineconn/once/executor/OnceExecutor.scala
index 67f8af0fa..dee4a885d 100644
--- 
a/linkis-computation-governance/linkis-engineconn/linkis-clustered-engineconn/linkis-once-engineconn/src/main/scala/org/apache/linkis/engineconn/once/executor/OnceExecutor.scala
+++ 
b/linkis-computation-governance/linkis-engineconn/linkis-clustered-engineconn/linkis-once-engineconn/src/main/scala/org/apache/linkis/engineconn/once/executor/OnceExecutor.scala
@@ -209,6 +209,7 @@ trait ManageableOnceExecutor extends AccessibleExecutor 
with OnceExecutor with R
       msg,
       EngineConnObject.getEngineCreationContext.getTicketId
     )
+    engineReleaseRequest.setNodeStatus(getStatus)
     Utils.tryAndWarn(Thread.sleep(500))
     logger.info("To send release request to linkis manager")
     
ManagerService.getManagerService.requestReleaseEngineConn(engineReleaseRequest)
diff --git 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/execution/AccessibleEngineConnExecution.scala
 
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/execution/AccessibleEngineConnExecution.scala
index fb5dbb645..581b08a5e 100644
--- 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/execution/AccessibleEngineConnExecution.scala
+++ 
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/execution/AccessibleEngineConnExecution.scala
@@ -94,15 +94,16 @@ class AccessibleEngineConnExecution extends 
EngineConnExecution with Logging {
               )
               return
           }
+          val nodeStatus = accessibleExecutor.getStatus
           if (NodeStatus.isCompleted(accessibleExecutor.getStatus)) {
             logger.error(
               s"${accessibleExecutor.getId} has completed with status 
${accessibleExecutor.getStatus}, now stop it."
             )
-            requestManagerReleaseExecutor("Completed release")
+            requestManagerReleaseExecutor("Completed release", nodeStatus)
             ShutdownHook.getShutdownHook.notifyStop()
           } else if (accessibleExecutor.getStatus == NodeStatus.ShuttingDown) {
             logger.warn(s"${accessibleExecutor.getId} is ShuttingDown...")
-            requestManagerReleaseExecutor(" ShuttingDown release")
+            requestManagerReleaseExecutor(" ShuttingDown release", nodeStatus)
             ShutdownHook.getShutdownHook.notifyStop()
           } else if (
               maxFreeTime > 0 && (NodeStatus.Unlock.equals(
@@ -118,7 +119,7 @@ class AccessibleEngineConnExecution extends 
EngineConnExecution with Logging {
                 s"${accessibleExecutor.getId} has not been used for 
$maxFreeTimeStr, now try to shutdown it."
               )
               accessibleExecutor.tryShutdown()
-              requestManagerReleaseExecutor(" idle release")
+              requestManagerReleaseExecutor(" idle release", nodeStatus)
               ShutdownHook.getShutdownHook.notifyStop()
             }
 
@@ -131,14 +132,14 @@ class AccessibleEngineConnExecution extends 
EngineConnExecution with Logging {
     )
   }
 
-  def requestManagerReleaseExecutor(msg: String): Unit = {
+  def requestManagerReleaseExecutor(msg: String, nodeStatus: NodeStatus): Unit 
= {
     val engineReleaseRequest = new EngineConnReleaseRequest(
       Sender.getThisServiceInstance,
       Utils.getJvmUser,
       msg,
       EngineConnObject.getEngineCreationContext.getTicketId
     )
-    Utils.tryAndWarn(Thread.sleep(100))
+    engineReleaseRequest.setNodeStatus(nodeStatus)
     logger.info("To send release request to linkis manager")
     
ManagerService.getManagerService.requestReleaseEngineConn(engineReleaseRequest)
   }
diff --git 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/AccessibleService.scala
 
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/AccessibleService.scala
index f103a6635..604a9a1d2 100644
--- 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/AccessibleService.scala
+++ 
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/AccessibleService.scala
@@ -32,6 +32,7 @@ trait AccessibleService extends ExecutorStatusListener {
 
   def dealEngineStopRequest(engineSuicideRequest: EngineSuicideRequest, 
sender: Sender): Unit
 
+  @deprecated
   def requestManagerReleaseExecutor(msg: String): Unit
 
   def dealRequestNodeStatus(requestNodeStatus: RequestNodeStatus): 
ResponseNodeStatus
diff --git 
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.scala
 
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.scala
index 3c8b3279a..421c92a71 100644
--- 
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.scala
+++ 
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.scala
@@ -196,6 +196,7 @@ class DefaultEngineCreateService
           failedEcNode.getLabels.addAll(
             LabelUtils.distinctLabel(labelFilter.choseEngineLabel(labelList), 
emNode.getLabels)
           )
+          failedEcNode.setNodeStatus(NodeStatus.Failed)
           engineStopService.engineConnInfoClear(failedEcNode)
         }
         throw t
@@ -220,6 +221,7 @@ class DefaultEngineCreateService
         failedEcNode.getLabels.addAll(
           LabelUtils.distinctLabel(labelFilter.choseEngineLabel(labelList), 
emNode.getLabels)
         )
+        failedEcNode.setNodeStatus(NodeStatus.Failed)
         engineStopService.engineConnInfoClear(failedEcNode)
       }
       throw new LinkisRetryException(
diff --git 
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineStopService.scala
 
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineStopService.scala
index 757fb393c..1dc4d92d2 100644
--- 
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineStopService.scala
+++ 
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineStopService.scala
@@ -20,6 +20,7 @@ package org.apache.linkis.manager.am.service.engine
 import org.apache.linkis.common.utils.{Logging, Utils}
 import org.apache.linkis.governance.common.conf.GovernanceCommonConf
 import org.apache.linkis.manager.am.conf.AMConfiguration
+import org.apache.linkis.manager.common.entity.enumeration.NodeStatus
 import org.apache.linkis.manager.common.entity.node.EngineNode
 import org.apache.linkis.manager.common.exception.RMErrorException
 import org.apache.linkis.manager.common.protocol.engine.{
@@ -78,6 +79,9 @@ class DefaultEngineStopService extends AbstractEngineService 
with EngineStopServ
       logger.info(s"Finished to kill engine invoke enginePointer 
${node.getServiceInstance}")
     }(s"Failed to stop engine ${node.getServiceInstance}")
     
node.setLabels(nodeLabelService.getNodeLabels(engineStopRequest.getServiceInstance))
+    if (null == node.getNodeStatus) {
+      node.setNodeStatus(NodeStatus.ShuttingDown)
+    }
     engineConnInfoClear(node)
     logger.info(
       s" user ${engineStopRequest.getUser} finished to stop engine 
${engineStopRequest.getServiceInstance}"
@@ -92,7 +96,7 @@ class DefaultEngineStopService extends AbstractEngineService 
with EngineStopServ
     logger.info(s"Start to clear ec info $ecNode")
     // 1. to clear engine resource
     Utils.tryCatch {
-      resourceManager.resourceReleased(ecNode.getLabels)
+      resourceManager.resourceReleased(ecNode)
     } {
       case exception: RMErrorException =>
         if (exception.getErrCode != 
RMErrorCode.LABEL_RESOURCE_NOT_FOUND.getCode) {
@@ -136,6 +140,7 @@ class DefaultEngineStopService extends 
AbstractEngineService with EngineStopServ
         s"Send stop  engine request 
${engineConnReleaseRequest.getServiceInstance.toString}"
       )
       
engineNode.setLabels(nodeLabelService.getNodeLabels(engineNode.getServiceInstance))
+      engineNode.setNodeStatus(engineConnReleaseRequest.getNodeStatus)
       engineConnInfoClear(engineNode)
     } else {
       logger.warn(
diff --git 
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/ResourceManager.scala
 
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/ResourceManager.scala
index d22ad64f8..78e414d43 100644
--- 
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/ResourceManager.scala
+++ 
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/ResourceManager.scala
@@ -18,6 +18,7 @@
 package org.apache.linkis.manager.rm.service
 
 import org.apache.linkis.common.ServiceInstance
+import org.apache.linkis.manager.common.entity.node.EngineNode
 import org.apache.linkis.manager.common.entity.resource.NodeResource
 import org.apache.linkis.manager.label.entity.Label
 import org.apache.linkis.manager.rm.{ResourceInfo, ResultResource}
@@ -82,9 +83,9 @@ abstract class ResourceManager {
   /**
    * Method called when the resource usage is released 当资源使用完成释放后,调用的方法
    *
-   * @param labels
+   * @param ecNode
    */
-  def resourceReleased(labels: util.List[Label[_]]): Unit
+  def resourceReleased(ecNode: EngineNode): Unit
 
   /**
    * If the IP and port are empty, return the resource status of all modules 
of a module   * Return
diff --git 
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/impl/DefaultResourceManager.scala
 
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/impl/DefaultResourceManager.scala
index 8358c21ec..e720e4733 100644
--- 
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/impl/DefaultResourceManager.scala
+++ 
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/impl/DefaultResourceManager.scala
@@ -23,7 +23,7 @@ import 
org.apache.linkis.governance.common.conf.GovernanceCommonConf
 import org.apache.linkis.manager.am.service.engine.EngineStopService
 import org.apache.linkis.manager.common.conf.RMConfiguration
 import org.apache.linkis.manager.common.entity.enumeration.NodeStatus
-import org.apache.linkis.manager.common.entity.node.{AMEMNode, AMEngineNode, 
InfoRMNode}
+import org.apache.linkis.manager.common.entity.node.{AMEMNode, AMEngineNode, 
EngineNode, InfoRMNode}
 import org.apache.linkis.manager.common.entity.persistence.{
   PersistenceLabel,
   PersistenceLock,
@@ -211,6 +211,7 @@ class DefaultResourceManager extends ResourceManager with 
Logging with Initializ
     ecNodes.foreach { engineNode =>
       Utils.tryAndWarn {
         
engineNode.setLabels(nodeLabelService.getNodeLabels(engineNode.getServiceInstance))
+        engineNode.setNodeStatus(NodeStatus.Failed)
         engineStopService.engineConnInfoClear(engineNode)
       }
     }
@@ -368,9 +369,6 @@ class DefaultResourceManager extends ResourceManager with 
Logging with Initializ
    * 当资源被实例化后,返回实际占用的资源总量
    *
    * @param labels
-   *   In general, resourceReleased will release the resources occupied by the 
user, but if the
-   *   process that uses the resource does not have time to call the 
resourceReleased method to die,
-   *   you need to unregister to release the resource.
    * @param usedResource
    */
   override def resourceUsed(labels: util.List[Label[_]], usedResource: 
NodeResource): Unit = {
@@ -610,10 +608,10 @@ class DefaultResourceManager extends ResourceManager with 
Logging with Initializ
   /**
    * Method called when the resource usage is released 当资源使用完成释放后,调用的方法
    *
-   * @param labels
+   * @param ecNode
    */
-  override def resourceReleased(labels: util.List[Label[_]]): Unit = {
-    val labelContainer = labelResourceService.enrichLabels(labels)
+  override def resourceReleased(ecNode: EngineNode): Unit = {
+    val labelContainer = labelResourceService.enrichLabels(ecNode.getLabels)
     if (null == labelContainer.getEngineInstanceLabel) {
       throw new RMErrorException(
         RMErrorCode.LABEL_RESOURCE_NOT_FOUND.getCode,
@@ -638,7 +636,12 @@ class DefaultResourceManager extends ResourceManager with 
Logging with Initializ
       logger.info(
         s"resourceRelease 
ready:${labelContainer.getEngineInstanceLabel.getServiceInstance},current node 
resource${usedResource}"
       )
-      val status = getNodeStatus(labelContainer.getEngineInstanceLabel)
+
+      val status = if (null == ecNode.getNodeStatus) {
+        getNodeStatus(labelContainer.getEngineInstanceLabel)
+      } else {
+        ecNode.getNodeStatus
+      }
 
       labelContainer.getResourceLabels.asScala
         .filter(!_.isInstanceOf[EngineInstanceLabel])
@@ -844,7 +847,11 @@ class DefaultResourceManager extends ResourceManager with 
Logging with Initializ
               logger.warn(
                 s"serviceInstance ${engineInstanceLabel.getServiceInstance} 
lock resource timeout, clear resource"
               )
-              resourceReleased(labels)
+              val ecNode = new AMEngineNode()
+              ecNode.setServiceInstance(engineInstanceLabel.getServiceInstance)
+              ecNode.setNodeStatus(NodeStatus.Failed)
+              ecNode.setLabels(labels)
+              resourceReleased(ecNode)
             case _ =>
           }
         }
diff --git 
a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/protocol/engine/EngineConnReleaseRequest.java
 
b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/protocol/engine/EngineConnReleaseRequest.java
index b7c8172b3..31e269d49 100644
--- 
a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/protocol/engine/EngineConnReleaseRequest.java
+++ 
b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/protocol/engine/EngineConnReleaseRequest.java
@@ -18,6 +18,7 @@
 package org.apache.linkis.manager.common.protocol.engine;
 
 import org.apache.linkis.common.ServiceInstance;
+import org.apache.linkis.manager.common.entity.enumeration.NodeStatus;
 
 public class EngineConnReleaseRequest implements EngineRequest {
 
@@ -27,6 +28,8 @@ public class EngineConnReleaseRequest implements 
EngineRequest {
 
   private String msg;
 
+  private NodeStatus nodeStatus;
+
   public String getTicketId() {
     return ticketId;
   }
@@ -71,4 +74,12 @@ public class EngineConnReleaseRequest implements 
EngineRequest {
   public void setMsg(String msg) {
     this.msg = msg;
   }
+
+  public NodeStatus getNodeStatus() {
+    return nodeStatus;
+  }
+
+  public void setNodeStatus(NodeStatus nodeStatus) {
+    this.nodeStatus = nodeStatus;
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to