This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/linkis.git


The following commit(s) were added to refs/heads/master by this push:
     new 9b6d6c0e4 add support for detach manage mode of flink EC (#4796)
9b6d6c0e4 is described below

commit 9b6d6c0e4a33a11b6c6fb44114adbab4e75da2a0
Author: Alexyang <[email protected]>
AuthorDate: Mon Jul 24 10:29:53 2023 +0800

    add support for detach manage mode of flink EC (#4796)
---
 .../client/once/LinkisManagerClient.scala          |  21 +-
 .../client/once/action/AskEngineConnAction.scala   | 101 +++++++++
 .../client/once/result/AskEngineConnResult.scala}  |  14 +-
 .../once/result/EngineConnOperateResult.scala      |   4 +-
 .../client/once/simple/SimpleOnceJob.scala         |   4 +
 .../impl/EngineConnApplicationInfoOperator.scala   |   7 +-
 .../common/enums/OnceJobOperationBoundary.java}    |  18 +-
 .../engineconn/EngineConnExecutorErrorCode.java    |   2 +
 .../common/conf/GovernanceCommonConf.scala         |   4 +
 .../common/constant/ec/ECConstants.scala           |  39 ++++
 .../concurrent/monitor/TimingMonitorService.java   |   2 +-
 .../service/DefaultNodeHeartbeatMsgManager.scala   |  21 +-
 .../engineconn/core/executor/ExecutorManager.scala |   2 +-
 .../impl/EngineConnApplicationInfoOperator.java    |   9 +-
 .../conf/AccessibleExecutorConfiguration.scala     |   4 +-
 .../AccessibleExecutorSpringConfiguration.scala    |   8 +-
 .../acessible/executor/hook/OperationHook.scala    |  50 +++++
 .../service/DefaultAccessibleService.scala         |   2 +-
 .../service/DefaultExecutorHeartbeatService.scala  |   1 +
 .../executor/service/DefaultOperateService.scala   |  47 +++-
 .../service/EngineConnTimedLockService.scala       |   2 +-
 .../service/ExecutorHeartbeatService.scala         |  11 +
 .../linkis/manager/am/exception/AMErrorCode.java   |   6 +-
 .../manager/am/restful/EngineRestfulApi.java       | 219 ++++++++++++++++--
 .../engine/DefaultEngineAskEngineService.java      |  10 +-
 .../manager/am/util/ECResourceInfoUtils.java       |   1 +
 .../apache/linkis/manager/am/utils/AMUtils.java    |   9 +
 .../manager/label/constant/LabelKeyConstant.java   |   2 +
 .../manager/label/entity/engine/ManagerLabel.java  |  53 +++++
 .../manager/label/entity/engine/RunType.scala      |   1 +
 .../linkis/manager/common/constant/AMConstant.java |  26 +++
 .../manager/common/entity/node/AMEngineNode.java   |  12 +
 .../manager/common/entity/node/EngineNode.java     |   8 +
 .../entity/persistence/PersistencerEcNodeInfo.java |   5 +-
 .../flink/operator/TriggerSavepointOperator.java   |  92 --------
 .../flink/config/FlinkEnvConfiguration.scala       |  17 ++
 .../flink/config/FlinkSrpingConfiguration.scala    |  46 ++--
 .../flink/constants/FlinkECConstant.scala          |  20 +-
 .../flink/context/EnvironmentContext.scala         |  14 +-
 .../flink/executor/FlinkJarOnceExecutor.scala      |  87 +++++++-
 .../executor/FlinkManagerConcurrentExecutor.scala  | 112 ++++++++++
 .../flink/executor/FlinkOnceExecutor.scala         |  10 +-
 .../flink/factory/FlinkEngineConnFactory.scala     |  36 ++-
 .../factory/FlinkManagerExecutorFactory.scala      |  71 ++++++
 .../flink/hook/EngineLoadOperationHook.scala       | 112 ++++++++++
 .../flink/operator/KillOperator.scala              |  92 ++++++++
 .../flink/operator/ListOperator.scala              | 115 ++++++++++
 .../flink/operator/StatusOperator.scala            |  99 +++++++++
 .../flink/operator/TriggerSavepointOperator.scala  |  93 ++++++++
 .../clientmanager/FlinkRestClientManager.scala     |  97 ++++++++
 .../engineconnplugin/flink/util/ManagerUtil.scala  |  16 +-
 .../engineconnplugin/flink/util/YarnUtil.scala     | 245 +++++++++++++++++++++
 .../hive/creation/HiveEngineConnFactory.scala      |   2 +-
 .../shell/factory/ShellEngineConnFactory.scala     |   2 +-
 54 files changed, 1907 insertions(+), 196 deletions(-)

diff --git 
a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/LinkisManagerClient.scala
 
b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/LinkisManagerClient.scala
index 3ac3cb7c8..45f3f49be 100644
--- 
a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/LinkisManagerClient.scala
+++ 
b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/LinkisManagerClient.scala
@@ -17,7 +17,9 @@
 
 package org.apache.linkis.computation.client.once
 
+import org.apache.linkis.common.utils.Utils
 import org.apache.linkis.computation.client.once.action.{
+  AskEngineConnAction,
   CreateEngineConnAction,
   EngineConnOperateAction,
   GetEngineConnAction,
@@ -25,6 +27,7 @@ import org.apache.linkis.computation.client.once.action.{
   LinkisManagerAction
 }
 import org.apache.linkis.computation.client.once.result.{
+  AskEngineConnResult,
   CreateEngineConnResult,
   EngineConnOperateResult,
   GetEngineConnResult,
@@ -39,6 +42,8 @@ import java.io.Closeable
 
 trait LinkisManagerClient extends Closeable {
 
+  def askEngineConn(askEngineConnAction: AskEngineConnAction): 
AskEngineConnResult
+
   def createEngineConn(createEngineConnAction: CreateEngineConnAction): 
CreateEngineConnResult
 
   def getEngineConn(getEngineConnAction: GetEngineConnAction): 
GetEngineConnResult
@@ -82,7 +87,21 @@ class LinkisManagerClientImpl(ujesClient: UJESClient) 
extends LinkisManagerClien
 
   override def executeEngineConnOperation(
       engineConnOperateAction: EngineConnOperateAction
-  ): EngineConnOperateResult = execute(engineConnOperateAction)
+  ): EngineConnOperateResult = {
+    Utils.tryCatch {
+      val rs = execute[EngineConnOperateResult](engineConnOperateAction)
+      rs
+    } { case e: Exception =>
+      val rs = new EngineConnOperateResult
+      rs.setIsError(true)
+      rs.setErrorMsg(e.getMessage)
+      rs
+    }
+  }
 
   override def close(): Unit = ujesClient.close()
+
+  override def askEngineConn(askEngineConnAction: AskEngineConnAction): 
AskEngineConnResult =
+    execute(askEngineConnAction)
+
 }
diff --git 
a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/action/AskEngineConnAction.scala
 
b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/action/AskEngineConnAction.scala
new file mode 100644
index 000000000..4b89b5376
--- /dev/null
+++ 
b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/action/AskEngineConnAction.scala
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.computation.client.once.action
+
+import org.apache.linkis.httpclient.dws.DWSHttpClient
+import org.apache.linkis.httpclient.request.POSTAction
+import org.apache.linkis.ujes.client.exception.UJESJobException
+
+import org.apache.commons.lang3.StringUtils
+
+import java.util
+
+class AskEngineConnAction extends POSTAction with LinkisManagerAction {
+
+  override def getRequestPayload: String =
+    DWSHttpClient.jacksonJson.writeValueAsString(getRequestPayloads)
+
+  override def suffixURLs: Array[String] = Array("linkisManager", 
"askEngineConn")
+
+}
+
+object AskEngineConnAction {
+
+  def newBuilder(): Builder = new Builder
+
+  class Builder private[AskEngineConnAction] () {
+    private var user: String = _
+    private var properties: util.Map[String, String] = _
+    private var labels: util.Map[String, String] = _
+    private var maxSubmitTime: Long = _
+    private var createService: String = _
+    private var description: String = _
+
+    def setUser(user: String): Builder = {
+      this.user = user
+      this
+    }
+
+    def setProperties(properties: util.Map[String, String]): Builder = {
+      this.properties = properties
+      this
+    }
+
+    def setLabels(labels: java.util.Map[String, String]): Builder = {
+      this.labels = labels
+      this
+    }
+
+    def setMaxSubmitTime(maxSubmitTime: Long): Builder = {
+      this.maxSubmitTime = maxSubmitTime
+      this
+    }
+
+    def setCreateService(createService: String): Builder = {
+      this.createService = createService
+      this
+    }
+
+    def setDescription(description: String): Builder = {
+      this.description = description
+      this
+    }
+
+    def build(): AskEngineConnAction = {
+      val action = new AskEngineConnAction()
+      if (user == null) throw new UJESJobException("user is needed!")
+      if (properties == null) properties = new java.util.HashMap[String, 
String]
+      if (labels == null) throw new UJESJobException("labels is needed!")
+      action.setUser(user)
+      action.addRequestPayload("properties", properties)
+      action.addRequestPayload("labels", labels)
+      if (StringUtils.isNotBlank(createService)) {
+        action.addRequestPayload("createService", createService)
+      }
+      if (null != maxSubmitTime) {
+        action.addRequestPayload("timeOut", maxSubmitTime)
+      }
+      if (StringUtils.isNotBlank(description)) {
+        action.addRequestPayload("description", description)
+      }
+      action
+    }
+
+  }
+
+}
diff --git 
a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/EngineNode.java
 
b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/result/AskEngineConnResult.scala
similarity index 75%
copy from 
linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/EngineNode.java
copy to 
linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/result/AskEngineConnResult.scala
index e3b8548bf..58c6085b4 100644
--- 
a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/EngineNode.java
+++ 
b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/result/AskEngineConnResult.scala
@@ -15,15 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.linkis.manager.common.entity.node;
+package org.apache.linkis.computation.client.once.result
 
-public interface EngineNode extends AMNode, RMNode, LabelNode {
+import org.apache.linkis.httpclient.dws.annotation.DWSHttpMessageResult
 
-  EMNode getEMNode();
-
-  void setEMNode(EMNode emNode);
-
-  String getLock();
-
-  void setLock(String lock);
-}
+@DWSHttpMessageResult("/api/rest_j/v\\d+/linkisManager/askEngineConn")
+class AskEngineConnResult extends GetEngineConnResult
diff --git 
a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/result/EngineConnOperateResult.scala
 
b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/result/EngineConnOperateResult.scala
index 1bf12e041..50df73bd1 100644
--- 
a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/result/EngineConnOperateResult.scala
+++ 
b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/result/EngineConnOperateResult.scala
@@ -33,9 +33,11 @@ class EngineConnOperateResult extends LinkisManagerResult {
     this.result = result
   }
 
+  def getErrorMsg(): String = errorMsg
+
   def setErrorMsg(errorMsg: String): Unit = this.errorMsg = errorMsg
 
-  def setError(isError: Boolean): Unit = this.isError = isError
+  def getIsError(): Boolean = isError
 
   def setIsError(isError: Boolean): Unit = this.isError = isError
 
diff --git 
a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/simple/SimpleOnceJob.scala
 
b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/simple/SimpleOnceJob.scala
index 492ae76b6..4992b1781 100644
--- 
a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/simple/SimpleOnceJob.scala
+++ 
b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/simple/SimpleOnceJob.scala
@@ -95,6 +95,10 @@ trait SimpleOnceJob extends OnceJob {
     case operator => operator
   }
 
+  def getEcServiceInstance: ServiceInstance = serviceInstance
+
+  def getEcTicketId: String = ticketId
+
 }
 
 class SubmittableSimpleOnceJob(
diff --git 
a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/operator/impl/EngineConnApplicationInfoOperator.scala
 
b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/operator/impl/EngineConnApplicationInfoOperator.scala
index 83399bf37..a1dba6340 100644
--- 
a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/operator/impl/EngineConnApplicationInfoOperator.scala
+++ 
b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/operator/impl/EngineConnApplicationInfoOperator.scala
@@ -19,6 +19,7 @@ package org.apache.linkis.computation.client.operator.impl
 
 import org.apache.linkis.computation.client.once.result.EngineConnOperateResult
 import org.apache.linkis.computation.client.operator.OnceJobOperator
+import org.apache.linkis.governance.common.constant.ec.ECConstants
 import org.apache.linkis.ujes.client.exception.UJESJobException
 
 class EngineConnApplicationInfoOperator extends 
OnceJobOperator[ApplicationInfo] {
@@ -28,7 +29,7 @@ class EngineConnApplicationInfoOperator extends 
OnceJobOperator[ApplicationInfo]
   override protected def resultToObject(result: EngineConnOperateResult): 
ApplicationInfo = {
     ApplicationInfo(
       result
-        .getAsOption("applicationId")
+        .getAsOption(ECConstants.YARN_APPID_NAME_KEY)
         .getOrElse(
           throw new UJESJobException(
             20300,
@@ -36,14 +37,14 @@ class EngineConnApplicationInfoOperator extends 
OnceJobOperator[ApplicationInfo]
           )
         ),
       result
-        .getAsOption("applicationUrl")
+        .getAsOption(ECConstants.YARN_APP_URL_KEY)
         .getOrElse(
           throw new UJESJobException(
             20300,
             s"Cannot get applicationUrl from EngineConn $getServiceInstance."
           )
         ),
-      result.getAs("queue")
+      result.getAs(ECConstants.QUEUE)
     )
   }
 
diff --git 
a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/EngineNode.java
 
b/linkis-computation-governance/linkis-computation-governance-common/src/main/java/org/apache/linkis/governance/common/enums/OnceJobOperationBoundary.java
similarity index 75%
copy from 
linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/EngineNode.java
copy to 
linkis-computation-governance/linkis-computation-governance-common/src/main/java/org/apache/linkis/governance/common/enums/OnceJobOperationBoundary.java
index e3b8548bf..37c6fc8d9 100644
--- 
a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/EngineNode.java
+++ 
b/linkis-computation-governance/linkis-computation-governance-common/src/main/java/org/apache/linkis/governance/common/enums/OnceJobOperationBoundary.java
@@ -15,15 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.linkis.manager.common.entity.node;
+package org.apache.linkis.governance.common.enums;
 
-public interface EngineNode extends AMNode, RMNode, LabelNode {
+public enum OnceJobOperationBoundary {
+  ECM("ecm"),
+  EC("ec");
 
-  EMNode getEMNode();
+  private String name;
 
-  void setEMNode(EMNode emNode);
+  OnceJobOperationBoundary(String name) {
+    this.name = name;
+  }
 
-  String getLock();
-
-  void setLock(String lock);
+  public String getName() {
+    return name;
+  }
 }
diff --git 
a/linkis-computation-governance/linkis-computation-governance-common/src/main/java/org/apache/linkis/governance/common/exception/engineconn/EngineConnExecutorErrorCode.java
 
b/linkis-computation-governance/linkis-computation-governance-common/src/main/java/org/apache/linkis/governance/common/exception/engineconn/EngineConnExecutorErrorCode.java
index c0d295755..89d3c9eba 100644
--- 
a/linkis-computation-governance/linkis-computation-governance-common/src/main/java/org/apache/linkis/governance/common/exception/engineconn/EngineConnExecutorErrorCode.java
+++ 
b/linkis-computation-governance/linkis-computation-governance-common/src/main/java/org/apache/linkis/governance/common/exception/engineconn/EngineConnExecutorErrorCode.java
@@ -37,4 +37,6 @@ public class EngineConnExecutorErrorCode {
   public static final int SEND_TO_ENTRANCE_ERROR = 40105;
 
   public static final int INIT_EXECUTOR_FAILED = 40106;
+
+  public static final int INVALID_APPLICATION_ID = 40107;
 }
diff --git 
a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/conf/GovernanceCommonConf.scala
 
b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/conf/GovernanceCommonConf.scala
index fc7d1c890..0029faa91 100644
--- 
a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/conf/GovernanceCommonConf.scala
+++ 
b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/conf/GovernanceCommonConf.scala
@@ -86,6 +86,10 @@ object GovernanceCommonConf {
     CommonVars(envKey, "").getValue
   }
 
+  // value ECConstants.EC_CLIENT_TYPE_ATTACH
+  val EC_APP_MANAGE_MODE =
+    CommonVars("linkis.ec.app.manage.mode", "attach")
+
   val SCALA_PARSE_APPEND_CODE_ENABLED =
     CommonVars("linkis.scala.parse.append.code.enable", true).getValue
 
diff --git 
a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/constant/ec/ECConstants.scala
 
b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/constant/ec/ECConstants.scala
index fe48f6887..a94eadf42 100644
--- 
a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/constant/ec/ECConstants.scala
+++ 
b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/constant/ec/ECConstants.scala
@@ -35,4 +35,43 @@ object ECConstants {
 
   val YARN_QUEUE_NAME_CONFIG_KEY = "wds.linkis.rm.yarnqueue"
 
+  val QUEUE = "queue"
+
+  val EC_CLIENT_TYPE_ATTACH = "attach"
+
+  val EC_CLIENT_TYPE_DETACH = "detach"
+
+  val YARN_APPID_NAME_KEY = "applicationId"
+
+  val YARN_APP_URL_KEY = "applicationUrl"
+
+  val YARN_APP_NAME_KEY = "appicationName"
+
+  val YARN_MODE_KEY = "yarnMode"
+
+  val EC_SERVICE_INSTANCE_KEY = "serviceInstance"
+
+  val ECM_SERVICE_INSTANCE_KEY = "ecmServiceInstance"
+
+  val MANAGER_SERVICE_INSTANCE_KEY = "managerServiceInstance"
+
+  val NODE_STATUS_KEY = "nodeStatus"
+
+  val EC_LAST_UNLOCK_TIMESTAMP = "lastUnlockTimestamp"
+
+  val YARN_APP_TYPE_LIST_KEY = "yarnAppTypeList"
+
+  val YARN_APP_STATE_LIST_KEY = "yarnAppStateList"
+
+  val YARN_APP_TYPE_KEY = "yarnAppType"
+
+  val YARN_APP_TYPE_SPARK = "spark"
+
+  val YARN_APP_TYPE_FLINK = "flink"
+
+  val EC_OPERATE_LIST = "list"
+
+  val EC_OPERATE_STATUS = "status"
+
+  val YARN_APP_RESULT_LIST_KEY = "yarnAppResultList"
 }
diff --git 
a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/java/org/apache/linkis/engineconn/computation/concurrent/monitor/TimingMonitorService.java
 
b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/java/org/apache/linkis/engineconn/computation/concurrent/monitor/TimingMonitorService.java
index de6bb440d..a84f58115 100644
--- 
a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/java/org/apache/linkis/engineconn/computation/concurrent/monitor/TimingMonitorService.java
+++ 
b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/java/org/apache/linkis/engineconn/computation/concurrent/monitor/TimingMonitorService.java
@@ -55,7 +55,7 @@ public class TimingMonitorService implements 
InitializingBean, Runnable {
 
   @Override
   public void afterPropertiesSet() throws Exception {
-    if (AccessibleExecutorConfiguration.ENGINECONN_SUPPORT_PARALLELISM()) {
+    if ((Boolean) 
AccessibleExecutorConfiguration.ENGINECONN_SUPPORT_PARALLELISM().getValue()) {
       Utils.defaultScheduler()
           .scheduleAtFixedRate(
               this, 3 * 60 * 1000, MONITOR_INTERVAL.getValue().toLong(), 
TimeUnit.MILLISECONDS);
diff --git 
a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/DefaultNodeHeartbeatMsgManager.scala
 
b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/DefaultNodeHeartbeatMsgManager.scala
index eccf54bfa..010ced97f 100644
--- 
a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/DefaultNodeHeartbeatMsgManager.scala
+++ 
b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/DefaultNodeHeartbeatMsgManager.scala
@@ -21,10 +21,13 @@ import org.apache.linkis.common.utils.{Logging, Utils}
 import 
org.apache.linkis.engineconn.acessible.executor.info.NodeHeartbeatMsgManager
 import 
org.apache.linkis.engineconn.computation.executor.metrics.ComputationEngineConnMetrics
 import org.apache.linkis.engineconn.core.EngineConnObject
-import org.apache.linkis.engineconn.executor.entity.{Executor, 
SensibleExecutor}
+import org.apache.linkis.engineconn.executor.entity.{Executor, 
SensibleExecutor, YarnExecutor}
 import org.apache.linkis.governance.common.constant.ec.ECConstants
+import org.apache.linkis.manager.common.entity.enumeration.NodeStatus
 import org.apache.linkis.server.BDPJettyServerHelper
 
+import org.apache.commons.lang3.StringUtils
+
 import org.springframework.stereotype.Component
 
 import java.util
@@ -72,6 +75,22 @@ class DefaultNodeHeartbeatMsgManager extends 
NodeHeartbeatMsgManager with Loggin
         
engineParams.get(ECConstants.YARN_QUEUE_NAME_CONFIG_KEY).asInstanceOf[Object]
       )
     }
+    executor match {
+      case yarnExecutor: YarnExecutor =>
+        if (StringUtils.isNotBlank(yarnExecutor.getQueue)) {
+          msgMap.put(ECConstants.YARN_QUEUE_NAME_KEY, yarnExecutor.getQueue)
+        }
+        if (StringUtils.isNotBlank(yarnExecutor.getApplicationId)) {
+          msgMap.put(ECConstants.YARN_APPID_NAME_KEY, 
yarnExecutor.getApplicationId)
+        }
+        if (StringUtils.isNotBlank(yarnExecutor.getApplicationURL)) {
+          msgMap.put(ECConstants.YARN_APP_URL_KEY, 
yarnExecutor.getApplicationURL)
+        }
+        if (StringUtils.isNotBlank(yarnExecutor.getYarnMode)) {
+          msgMap.put(ECConstants.YARN_MODE_KEY, yarnExecutor.getYarnMode)
+        }
+      case _ =>
+    }
     Utils.tryCatch(BDPJettyServerHelper.gson.toJson(msgMap)) { case e: 
Exception =>
       val msgs = msgMap.asScala
         .map { case (k, v) => if (null == v) s"${k}->null" else 
s"${k}->${v.toString}" }
diff --git 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-core/src/main/scala/org/apache/linkis/engineconn/core/executor/ExecutorManager.scala
 
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-core/src/main/scala/org/apache/linkis/engineconn/core/executor/ExecutorManager.scala
index 7c180731a..61242beaa 100644
--- 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-core/src/main/scala/org/apache/linkis/engineconn/core/executor/ExecutorManager.scala
+++ 
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-core/src/main/scala/org/apache/linkis/engineconn/core/executor/ExecutorManager.scala
@@ -137,7 +137,7 @@ class LabelExecutorManagerImpl extends LabelExecutorManager 
with Logging {
   }
 
   protected def getLabelKey(labels: Array[Label[_]]): String =
-    labels.map(_.getStringValue).mkString("&")
+    labels.filter(null != _).map(_.getStringValue).mkString("&")
 
   protected def createExecutor(
       engineCreationContext: EngineCreationContext,
diff --git 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/java/org/apache/linkis/engineconn/acessible/executor/operator/impl/EngineConnApplicationInfoOperator.java
 
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/java/org/apache/linkis/engineconn/acessible/executor/operator/impl/EngineConnApplicationInfoOperator.java
index 28a75d3f9..66e1c575f 100644
--- 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/java/org/apache/linkis/engineconn/acessible/executor/operator/impl/EngineConnApplicationInfoOperator.java
+++ 
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/java/org/apache/linkis/engineconn/acessible/executor/operator/impl/EngineConnApplicationInfoOperator.java
@@ -22,6 +22,7 @@ import 
org.apache.linkis.engineconn.core.executor.ExecutorManager$;
 import org.apache.linkis.engineconn.core.executor.LabelExecutorManager;
 import org.apache.linkis.engineconn.executor.entity.Executor;
 import org.apache.linkis.engineconn.executor.entity.YarnExecutor;
+import org.apache.linkis.governance.common.constant.ec.ECConstants;
 import org.apache.linkis.manager.common.operator.Operator;
 
 import java.util.HashMap;
@@ -43,10 +44,10 @@ public class EngineConnApplicationInfoOperator implements 
Operator {
     if (reportExecutor instanceof YarnExecutor) {
       YarnExecutor yarnExecutor = (YarnExecutor) reportExecutor;
       Map<String, Object> result = new HashMap<>();
-      result.put("applicationId", yarnExecutor.getApplicationId());
-      result.put("applicationUrl", yarnExecutor.getApplicationURL());
-      result.put("queue", yarnExecutor.getQueue());
-      result.put("yarnMode", yarnExecutor.getYarnMode());
+      result.put(ECConstants.YARN_APPID_NAME_KEY(), 
yarnExecutor.getApplicationId());
+      result.put(ECConstants.YARN_APP_URL_KEY(), 
yarnExecutor.getApplicationURL());
+      result.put(ECConstants.QUEUE(), yarnExecutor.getQueue());
+      result.put(ECConstants.YARN_MODE_KEY(), yarnExecutor.getYarnMode());
       return result;
     } else {
       throw new EngineConnException(
diff --git 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/conf/AccessibleExecutorConfiguration.scala
 
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/conf/AccessibleExecutorConfiguration.scala
index 26a25a153..95a01202e 100644
--- 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/conf/AccessibleExecutorConfiguration.scala
+++ 
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/conf/AccessibleExecutorConfiguration.scala
@@ -42,8 +42,8 @@ object AccessibleExecutorConfiguration {
   val ENGINECONN_LOCK_CHECK_INTERVAL =
     CommonVars("wds.linkis.engineconn.lock.free.interval", new TimeType("3m"))
 
-  val ENGINECONN_SUPPORT_PARALLELISM: Boolean =
-    CommonVars("wds.linkis.engineconn.support.parallelism", false).getValue
+  val ENGINECONN_SUPPORT_PARALLELISM =
+    CommonVars("wds.linkis.engineconn.support.parallelism", false)
 
   val ENGINECONN_HEARTBEAT_TIME =
     CommonVars("wds.linkis.engineconn.heartbeat.time", new TimeType("2m"))
diff --git 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/conf/AccessibleExecutorSpringConfiguration.scala
 
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/conf/AccessibleExecutorSpringConfiguration.scala
index 53cdd44b0..93cb41f34 100644
--- 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/conf/AccessibleExecutorSpringConfiguration.scala
+++ 
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/conf/AccessibleExecutorSpringConfiguration.scala
@@ -43,9 +43,13 @@ class AccessibleExecutorSpringConfiguration extends Logging {
   def createLockManager(): LockService = {
 
     val lockService =
-      if (AccessibleExecutorConfiguration.ENGINECONN_SUPPORT_PARALLELISM) {
+      if 
(AccessibleExecutorConfiguration.ENGINECONN_SUPPORT_PARALLELISM.getHotValue()) {
+        logger.info("Engine supports parallelism.")
         new EngineConnConcurrentLockService
-      } else new EngineConnTimedLockService
+      } else {
+        logger.info("Engine doesn't support parallelism.")
+        new EngineConnTimedLockService
+      }
     asyncListenerBusContext.addListener(lockService)
     lockService
   }
diff --git 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/hook/OperationHook.scala
 
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/hook/OperationHook.scala
new file mode 100644
index 000000000..12e42c66a
--- /dev/null
+++ 
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/hook/OperationHook.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineconn.acessible.executor.hook
+
+import org.apache.linkis.manager.common.protocol.engine.{
+  EngineOperateRequest,
+  EngineOperateResponse
+}
+
+import scala.collection.mutable.ArrayBuffer
+
+trait OperationHook {
+  def getName(): String
+
+  def doPreOperation(
+      engineOperateRequest: EngineOperateRequest,
+      engineOperateResponse: EngineOperateResponse
+  ): Unit
+
+  def doPostOperation(
+      engineOperateRequest: EngineOperateRequest,
+      engineOperateResponse: EngineOperateResponse
+  ): Unit
+
+}
+
+object OperationHook {
+  private var operationHooks: ArrayBuffer[OperationHook] = new 
ArrayBuffer[OperationHook]()
+
+  def registerOperationHook(operationHook: OperationHook): Unit = {
+    operationHooks.append(operationHook)
+  }
+
+  def getOperationHooks(): Array[OperationHook] = operationHooks.toArray
+}
diff --git 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/DefaultAccessibleService.scala
 
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/DefaultAccessibleService.scala
index 06fd13b0e..8ef944fc9 100644
--- 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/DefaultAccessibleService.scala
+++ 
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/DefaultAccessibleService.scala
@@ -74,7 +74,7 @@ class DefaultAccessibleService extends AccessibleService with 
Logging {
         
DataWorkCloudApplication.getServiceInstance.equals(engineSuicideRequest.getServiceInstance)
     ) {
       stopEngine()
-      logger.info(s"engine will suiside now.")
+      logger.info(s"engine was asked to suiside by 
${engineSuicideRequest.getUser} now.")
       ShutdownHook.getShutdownHook.notifyStop()
     } else {
       if (null != engineSuicideRequest.getServiceInstance) {
diff --git 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/DefaultExecutorHeartbeatService.scala
 
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/DefaultExecutorHeartbeatService.scala
index 067e0d2cb..ea3248ba6 100644
--- 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/DefaultExecutorHeartbeatService.scala
+++ 
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/DefaultExecutorHeartbeatService.scala
@@ -78,6 +78,7 @@ class DefaultExecutorHeartbeatService
       heartbeatTime,
       TimeUnit.MILLISECONDS
     )
+    ExecutorHeartbeatServiceHolder.registerHeartBeatService(this)
   }
 
   /**
diff --git 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/DefaultOperateService.scala
 
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/DefaultOperateService.scala
index 1040df40a..f82c5e944 100644
--- 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/DefaultOperateService.scala
+++ 
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/DefaultOperateService.scala
@@ -18,6 +18,7 @@
 package org.apache.linkis.engineconn.acessible.executor.service
 
 import org.apache.linkis.common.utils.{Logging, Utils}
+import org.apache.linkis.engineconn.acessible.executor.hook.OperationHook
 import org.apache.linkis.manager.common.operator.OperatorFactory
 import org.apache.linkis.manager.common.protocol.engine.{
   EngineOperateRequest,
@@ -31,6 +32,8 @@ import org.springframework.stereotype.Service
 
 import java.util
 
+import scala.collection.JavaConverters.mapAsScalaMapConverter
+
 @Service
 class DefaultOperateService extends OperateService with Logging {
 
@@ -38,27 +41,59 @@ class DefaultOperateService extends OperateService with 
Logging {
   override def executeOperation(
       engineOperateRequest: EngineOperateRequest
   ): EngineOperateResponse = {
-    val parameters = engineOperateRequest.getParameters()
+    var response: EngineOperateResponse = null
+
+    val parameters =
+      
engineOperateRequest.getParameters.asScala.toMap.asInstanceOf[util.Map[String, 
Object]]
     val operator = 
Utils.tryCatch(OperatorFactory.apply().getOperatorRequest(parameters)) { t =>
       logger.error(s"Get operator failed, parameters is 
${engineOperateRequest.getParameters}.", t)
-      return new EngineOperateResponse(
-        new util.HashMap,
+      response = new EngineOperateResponse(
+        new util.HashMap[String, Object](),
         true,
         ExceptionUtils.getRootCauseMessage(t)
       )
+      doPostHook(engineOperateRequest, response)
+      return response
     }
     logger.info(
       s"Try to execute operator ${operator.getClass.getSimpleName} with 
parameters ${engineOperateRequest.getParameters}."
     )
     val result = Utils.tryCatch(operator(parameters)) { t =>
       logger.error(s"Execute ${operator.getClass.getSimpleName} failed.", t)
-      return new EngineOperateResponse(
-        new util.HashMap,
+      response = new EngineOperateResponse(
+        new util.HashMap[String, Object](),
         true,
         ExceptionUtils.getRootCauseMessage(t)
       )
+      doPostHook(engineOperateRequest, response)
+      return response
+    }
+    logger.info(s"End to execute operator ${operator.getClass.getSimpleName}.")
+    response = new EngineOperateResponse(result)
+    doPostHook(engineOperateRequest, response)
+    response
+  }
+
+  private def doPreHook(
+      engineOperateRequest: EngineOperateRequest,
+      engineOperateResponse: EngineOperateResponse
+  ): Unit = {
+    Utils.tryAndWarn {
+      OperationHook
+        .getOperationHooks()
+        .foreach(hook => hook.doPreOperation(engineOperateRequest, 
engineOperateResponse))
+    }
+  }
+
+  private def doPostHook(
+      engineOperateRequest: EngineOperateRequest,
+      engineOperateResponse: EngineOperateResponse
+  ): Unit = {
+    Utils.tryAndWarn {
+      OperationHook
+        .getOperationHooks()
+        .foreach(hook => hook.doPostOperation(engineOperateRequest, 
engineOperateResponse))
     }
-    new EngineOperateResponse(result)
   }
 
 }
diff --git 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/EngineConnTimedLockService.scala
 
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/EngineConnTimedLockService.scala
index 21325f42b..b5bbc26f9 100644
--- 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/EngineConnTimedLockService.scala
+++ 
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/EngineConnTimedLockService.scala
@@ -51,7 +51,7 @@ class EngineConnTimedLockService extends LockService with 
Logging {
   private var lockType: EngineLockType = EngineLockType.Timed
 
   private def isSupportParallelism: Boolean =
-    AccessibleExecutorConfiguration.ENGINECONN_SUPPORT_PARALLELISM
+    
AccessibleExecutorConfiguration.ENGINECONN_SUPPORT_PARALLELISM.getHotValue()
 
   /**
    * @param lock
diff --git 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/ExecutorHeartbeatService.scala
 
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/ExecutorHeartbeatService.scala
index bfecf7325..7abcbe8dc 100644
--- 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/ExecutorHeartbeatService.scala
+++ 
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/ExecutorHeartbeatService.scala
@@ -34,3 +34,14 @@ trait ExecutorHeartbeatService {
   def dealNodeHeartbeatRequest(nodeHeartbeatRequest: NodeHeartbeatRequest): 
NodeHeartbeatMsg
 
 }
+
+object ExecutorHeartbeatServiceHolder {
+
+  private var executorHeartbeatService: ExecutorHeartbeatService = _
+
+  def registerHeartBeatService(executorHeartbeatService: 
ExecutorHeartbeatService): Unit =
+    this.executorHeartbeatService = executorHeartbeatService
+
+  def getDefaultHeartbeatService(): ExecutorHeartbeatService = 
executorHeartbeatService
+
+}
diff --git 
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/exception/AMErrorCode.java
 
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/exception/AMErrorCode.java
index 3734e3bdf..c05768739 100644
--- 
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/exception/AMErrorCode.java
+++ 
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/exception/AMErrorCode.java
@@ -28,7 +28,11 @@ public enum AMErrorCode implements LinkisErrorCode {
 
   NOT_EXISTS_ENGINE_CONN(210003, "Not exists EngineConn(不存在的引擎)"),
 
-  AM_CONF_ERROR(210004, "AM configuration error(AM配置错误)");
+  AM_CONF_ERROR(210004, "AM configuration error(AM配置错误)"),
+
+  ASK_ENGINE_ERROR_RETRY(210005, "Ask engine error, retry(请求引擎失败,重试)"),
+
+  EC_OPERATE_ERROR(210006, "Failed to execute operation(引擎操作失败)");
 
   private final int errorCode;
 
diff --git 
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/restful/EngineRestfulApi.java
 
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/restful/EngineRestfulApi.java
index 3d6e0bc39..14cad1380 100644
--- 
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/restful/EngineRestfulApi.java
+++ 
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/restful/EngineRestfulApi.java
@@ -23,28 +23,28 @@ import 
org.apache.linkis.common.exception.LinkisRetryException;
 import org.apache.linkis.common.utils.ByteTimeUtils;
 import org.apache.linkis.common.utils.JsonUtils;
 import org.apache.linkis.governance.common.conf.GovernanceCommonConf;
+import org.apache.linkis.governance.common.constant.ec.ECConstants;
+import org.apache.linkis.governance.common.utils.JobUtils;
+import org.apache.linkis.governance.common.utils.LoggerUtils;
 import org.apache.linkis.manager.am.conf.AMConfiguration;
 import org.apache.linkis.manager.am.exception.AMErrorCode;
 import org.apache.linkis.manager.am.exception.AMErrorException;
 import org.apache.linkis.manager.am.manager.EngineNodeManager;
 import org.apache.linkis.manager.am.service.ECResourceInfoService;
-import org.apache.linkis.manager.am.service.engine.EngineCreateService;
-import org.apache.linkis.manager.am.service.engine.EngineInfoService;
-import org.apache.linkis.manager.am.service.engine.EngineOperateService;
-import org.apache.linkis.manager.am.service.engine.EngineStopService;
+import org.apache.linkis.manager.am.service.engine.*;
 import org.apache.linkis.manager.am.util.ECResourceInfoUtils;
 import org.apache.linkis.manager.am.utils.AMUtils;
 import org.apache.linkis.manager.am.vo.AMEngineNodeVo;
+import org.apache.linkis.manager.common.constant.AMConstant;
 import org.apache.linkis.manager.common.entity.enumeration.NodeStatus;
 import org.apache.linkis.manager.common.entity.node.AMEMNode;
+import org.apache.linkis.manager.common.entity.node.EMNode;
 import org.apache.linkis.manager.common.entity.node.EngineNode;
 import 
org.apache.linkis.manager.common.entity.persistence.ECResourceInfoRecord;
-import org.apache.linkis.manager.common.protocol.engine.EngineCreateRequest;
-import org.apache.linkis.manager.common.protocol.engine.EngineOperateRequest;
-import org.apache.linkis.manager.common.protocol.engine.EngineOperateResponse;
-import org.apache.linkis.manager.common.protocol.engine.EngineStopRequest;
+import org.apache.linkis.manager.common.protocol.engine.*;
 import org.apache.linkis.manager.label.builder.factory.LabelBuilderFactory;
 import 
org.apache.linkis.manager.label.builder.factory.LabelBuilderFactoryContext;
+import org.apache.linkis.manager.label.constant.LabelKeyConstant;
 import org.apache.linkis.manager.label.entity.Label;
 import org.apache.linkis.manager.label.entity.UserModifiable;
 import org.apache.linkis.manager.label.exception.LabelErrorException;
@@ -65,6 +65,7 @@ import javax.servlet.http.HttpServletRequest;
 import java.io.IOException;
 import java.text.MessageFormat;
 import java.util.*;
+import java.util.concurrent.Callable;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -103,6 +104,8 @@ public class EngineRestfulApi {
 
   @Autowired private ECResourceInfoService ecResourceInfoService;
 
+  @Autowired private EngineReuseService engineReuseService;
+
   private final ObjectMapper objectMapper = new ObjectMapper();
 
   private LabelBuilderFactory stdLabelBuilderFactory =
@@ -110,6 +113,183 @@ public class EngineRestfulApi {
 
   private static final Logger logger = 
LoggerFactory.getLogger(EngineRestfulApi.class);
 
+  @ApiOperation(value = "askEngineConn", response = Message.class)
+  @ApiOperationSupport(ignoreParameters = {"jsonNode"})
+  @RequestMapping(path = "/askEngineConn", method = RequestMethod.POST)
+  public Message askEngineConn(
+      HttpServletRequest req, @RequestBody EngineAskRequest engineAskRequest)
+      throws IOException, InterruptedException {
+    String userName = ModuleUserUtils.getOperationUser(req, "askEngineConn");
+    engineAskRequest.setUser(userName);
+    long timeout = engineAskRequest.getTimeOut();
+    if (timeout <= 0) {
+      timeout = 
AMConfiguration.ENGINE_CONN_START_REST_MAX_WAIT_TIME.getValue().toLong();
+      engineAskRequest.setTimeOut(timeout);
+    }
+    Map<String, Object> retEngineNode = new HashMap<>();
+    logger.info(
+        "User {} try to ask an engineConn with maxStartTime {}. 
EngineAskRequest is {}.",
+        userName,
+        ByteTimeUtils.msDurationToString(timeout),
+        engineAskRequest);
+    Sender sender = Sender.getSender(Sender.getThisServiceInstance());
+    EngineNode engineNode = null;
+
+    // try to reuse ec first
+    String taskId = 
JobUtils.getJobIdFromStringMap(engineAskRequest.getProperties());
+    LoggerUtils.setJobIdMDC(taskId);
+    logger.info("received task : {}, engineAskRequest : {}", taskId, 
engineAskRequest);
+    if 
(!engineAskRequest.getLabels().containsKey(LabelKeyConstant.EXECUTE_ONCE_KEY)) {
+      EngineReuseRequest engineReuseRequest = new EngineReuseRequest();
+      engineReuseRequest.setLabels(engineAskRequest.getLabels());
+      engineReuseRequest.setTimeOut(engineAskRequest.getTimeOut());
+      engineReuseRequest.setUser(engineAskRequest.getUser());
+      engineReuseRequest.setProperties(engineAskRequest.getProperties());
+      boolean end = false;
+      EngineNode reuseNode = null;
+      int count = 0;
+      int MAX_RETRY = 2;
+      while (!end) {
+        try {
+          reuseNode = engineReuseService.reuseEngine(engineReuseRequest, 
sender);
+          end = true;
+        } catch (LinkisRetryException e) {
+          logger.error(
+              "task: {}, user: {} reuse engine failed", taskId, 
engineReuseRequest.getUser(), e);
+          Thread.sleep(1000);
+          end = false;
+          count += 1;
+          if (count > MAX_RETRY) {
+            end = true;
+          }
+        } catch (Exception e1) {
+          logger.info(
+              "task: {} user: {} reuse engine failed", taskId, 
engineReuseRequest.getUser(), e1);
+          end = true;
+        }
+      }
+      if (null != reuseNode) {
+        logger.info(
+            "Finished to ask engine for task: {}, user: {} by reuse node {}",
+            taskId,
+            engineReuseRequest.getUser(),
+            reuseNode);
+        LoggerUtils.removeJobIdMDC();
+        engineNode = reuseNode;
+      }
+    }
+
+    if (null != engineNode) {
+      fillResultEngineNode(retEngineNode, engineNode);
+      return Message.ok("reuse engineConn ended.").data("engine", 
retEngineNode);
+    }
+
+    String engineAskAsyncId = AMUtils.getAsyncId();
+    Callable<Object> createECTask =
+        new Callable() {
+          @Override
+          public Object call() {
+            LoggerUtils.setJobIdMDC(taskId);
+            logger.info(
+                "Task: {}, start to async({}) createEngine: {}",
+                taskId,
+                engineAskAsyncId,
+                engineAskRequest.getCreateService());
+            // 如果原来的labels含engineInstance ,先去掉
+            engineAskRequest.getLabels().remove("engineInstance");
+            EngineCreateRequest engineCreateRequest = new 
EngineCreateRequest();
+            engineCreateRequest.setLabels(engineAskRequest.getLabels());
+            engineCreateRequest.setTimeout(engineAskRequest.getTimeOut());
+            engineCreateRequest.setUser(engineAskRequest.getUser());
+            
engineCreateRequest.setProperties(engineAskRequest.getProperties());
+            
engineCreateRequest.setCreateService(engineAskRequest.getCreateService());
+            try {
+              EngineNode createNode = 
engineCreateService.createEngine(engineCreateRequest, sender);
+              long timeout = 0L;
+              if (engineCreateRequest.getTimeout() <= 0) {
+                timeout = 
AMConfiguration.ENGINE_START_MAX_TIME.getValue().toLong();
+              } else {
+                timeout = engineCreateRequest.getTimeout();
+              }
+              // useEngine 需要加上超时
+              EngineNode createEngineNode = 
engineNodeManager.useEngine(createNode, timeout);
+              if (null == createEngineNode) {
+                throw new LinkisRetryException(
+                    AMConstant.EM_ERROR_CODE,
+                    "create engine${createNode.getServiceInstance} success, 
but to use engine failed");
+              }
+              logger.info(
+                  "Task: $taskId finished to ask engine for user 
${engineAskRequest.getUser} by create node $createEngineNode");
+              return createEngineNode;
+            } catch (Exception e) {
+              logger.error(
+                  "Task: {} failed to ask engine for user {} by create node", 
taskId, userName, e);
+              return new LinkisRetryException(AMConstant.EM_ERROR_CODE, 
e.getMessage());
+            } finally {
+              LoggerUtils.removeJobIdMDC();
+            }
+          }
+        };
+
+    try {
+      Object rs = createECTask.call();
+      if (rs instanceof LinkisRetryException) {
+        throw (LinkisRetryException) rs;
+      } else {
+        engineNode = (EngineNode) rs;
+      }
+    } catch (LinkisRetryException retryException) {
+      logger.error(
+          "User {} create engineConn failed get retry  exception. can be 
Retry",
+          userName,
+          retryException);
+      return Message.error(
+              String.format(
+                  "Create engineConn failed, caused by %s.",
+                  ExceptionUtils.getRootCauseMessage(retryException)))
+          .data("canRetry", true);
+    } catch (Exception e) {
+      LoggerUtils.removeJobIdMDC();
+      logger.error("User {} create engineConn failed get retry  exception", 
userName, e);
+      return Message.error(
+          String.format(
+              "Create engineConn failed, caused by %s.", 
ExceptionUtils.getRootCauseMessage(e)));
+    }
+
+    LoggerUtils.removeJobIdMDC();
+    fillResultEngineNode(retEngineNode, engineNode);
+    logger.info(
+        "Finished to create a engineConn for user {}. NodeInfo is {}.", 
userName, engineNode);
+    // to transform to a map
+    return Message.ok("create engineConn ended.").data("engine", 
retEngineNode);
+  }
+
+  private void fillNullNode(
+      Map<String, Object> retEngineNode, EngineAskAsyncResponse 
askAsyncResponse) {
+    retEngineNode.put(AMConstant.EC_ASYNC_START_RESULT_KEY, 
AMConstant.EC_ASYNC_START_RESULT_FAIL);
+    retEngineNode.put(
+        AMConstant.EC_ASYNC_START_FAIL_MSG_KEY,
+        "Got null response for asyId : " + askAsyncResponse.getId());
+    retEngineNode.put(ECConstants.MANAGER_SERVICE_INSTANCE_KEY(), 
Sender.getThisServiceInstance());
+  }
+
+  private void fillResultEngineNode(Map<String, Object> retEngineNode, 
EngineNode engineNode) {
+    retEngineNode.put(
+        AMConstant.EC_ASYNC_START_RESULT_KEY, 
AMConstant.EC_ASYNC_START_RESULT_SUCCESS);
+    retEngineNode.put("serviceInstance", engineNode.getServiceInstance());
+    if (null == engineNode.getNodeStatus()) {
+      engineNode.setNodeStatus(NodeStatus.Starting);
+    }
+    retEngineNode.put(ECConstants.NODE_STATUS_KEY(), 
engineNode.getNodeStatus().toString());
+    retEngineNode.put(ECConstants.EC_TICKET_ID_KEY(), 
engineNode.getTicketId());
+    EMNode emNode = engineNode.getEMNode();
+    if (null != emNode) {
+      retEngineNode.put(
+          ECConstants.ECM_SERVICE_INSTANCE_KEY(), 
engineNode.getEMNode().getServiceInstance());
+    }
+    retEngineNode.put(ECConstants.MANAGER_SERVICE_INSTANCE_KEY(), 
Sender.getThisServiceInstance());
+  }
+
   @ApiOperation(value = "createEngineConn", response = Message.class)
   @ApiOperationSupport(ignoreParameters = {"jsonNode"})
   @RequestMapping(path = "/createEngineConn", method = RequestMethod.POST)
@@ -149,13 +329,7 @@ public class EngineRestfulApi {
         "Finished to create a engineConn for user {}. NodeInfo is {}.", 
userName, engineNode);
     // to transform to a map
     Map<String, Object> retEngineNode = new HashMap<>();
-    retEngineNode.put("serviceInstance", engineNode.getServiceInstance());
-    if (null == engineNode.getNodeStatus()) {
-      engineNode.setNodeStatus(NodeStatus.Starting);
-    }
-    retEngineNode.put("nodeStatus", engineNode.getNodeStatus().toString());
-    retEngineNode.put("ticketId", engineNode.getTicketId());
-    retEngineNode.put("ecmServiceInstance", 
engineNode.getEMNode().getServiceInstance());
+    fillResultEngineNode(retEngineNode, engineNode);
     return Message.ok("create engineConn succeed.").data("engine", 
retEngineNode);
   }
 
@@ -173,6 +347,7 @@ public class EngineRestfulApi {
     } catch (Exception e) {
       logger.info("Instances {} does not exist", 
serviceInstance.getInstance());
     }
+    String ecMetrics = null;
     if (null == engineNode) {
       ECResourceInfoRecord ecInfo = null;
       if (null != ticketIdNode) {
@@ -189,12 +364,19 @@ public class EngineRestfulApi {
       if (null == ecInfo) {
         return Message.error("Instance does not exist " + serviceInstance);
       }
+      if (null == ecMetrics) {
+        ecMetrics = ecInfo.getMetrics();
+      }
       engineNode = ECResourceInfoUtils.convertECInfoTOECNode(ecInfo);
+    } else {
+      ecMetrics = engineNode.getEcMetrics();
     }
     if (!userName.equals(engineNode.getOwner()) && 
Configuration.isNotAdmin(userName)) {
       return Message.error("You have no permission to access EngineConn " + 
serviceInstance);
     }
-    return Message.ok().data("engine", engineNode);
+    Message result = Message.ok().data("engine", engineNode);
+    result.data(AMConstant.EC_METRICS_KEY, ecMetrics);
+    return result;
   }
 
   @ApiOperation(value = "kill egineconn", notes = "kill engineconn", response 
= Message.class)
@@ -487,6 +669,11 @@ public class EngineRestfulApi {
     ServiceInstance serviceInstance = getServiceInstance(jsonNode);
     logger.info("User {} try to execute Engine Operation {}.", userName, 
serviceInstance);
     EngineNode engineNode = engineNodeManager.getEngineNode(serviceInstance);
+    if (null == engineNode) {
+      return Message.ok()
+          .data("isError", true)
+          .data("errorMsg", "Ec : " + serviceInstance.toString() + " not 
found.");
+    }
     if (!userName.equals(engineNode.getOwner()) && 
Configuration.isNotAdmin(userName)) {
       return Message.error("You have no permission to execute Engine Operation 
" + serviceInstance);
     }
diff --git 
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/engine/DefaultEngineAskEngineService.java
 
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/engine/DefaultEngineAskEngineService.java
index 36675ff84..ab5799063 100644
--- 
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/engine/DefaultEngineAskEngineService.java
+++ 
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/engine/DefaultEngineAskEngineService.java
@@ -22,6 +22,7 @@ import org.apache.linkis.governance.common.utils.JobUtils;
 import org.apache.linkis.governance.common.utils.LoggerUtils;
 import org.apache.linkis.manager.am.conf.AMConfiguration;
 import org.apache.linkis.manager.am.util.LinkisUtils;
+import org.apache.linkis.manager.am.utils.AMUtils;
 import org.apache.linkis.manager.common.constant.AMConstant;
 import org.apache.linkis.manager.common.entity.node.EngineNode;
 import org.apache.linkis.manager.common.protocol.engine.*;
@@ -36,7 +37,6 @@ import org.springframework.stereotype.Service;
 
 import java.net.SocketTimeoutException;
 import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import feign.RetryableException;
 import org.slf4j.Logger;
@@ -50,8 +50,6 @@ public class DefaultEngineAskEngineService extends 
AbstractEngineService
 
   private EngineCreateService engineCreateService;
   private EngineReuseService engineReuseService;
-  private AtomicInteger idCreator = new AtomicInteger();
-  private String idPrefix = Sender.getThisServiceInstance().getInstance();
 
   private static final ThreadPoolExecutor EXECUTOR =
       LinkisUtils.newCachedThreadPool(
@@ -103,7 +101,7 @@ public class DefaultEngineAskEngineService extends 
AbstractEngineService
       }
     }
 
-    String engineAskAsyncId = getAsyncId();
+    String engineAskAsyncId = AMUtils.getAsyncId();
     CompletableFuture<EngineNode> createNodeThread =
         CompletableFuture.supplyAsync(
             () -> {
@@ -197,8 +195,4 @@ public class DefaultEngineAskEngineService extends 
AbstractEngineService
     LoggerUtils.removeJobIdMDC();
     return new EngineAskAsyncResponse(engineAskAsyncId, 
Sender.getThisServiceInstance());
   }
-
-  private String getAsyncId() {
-    return idPrefix + "_" + idCreator.getAndIncrement();
-  }
 }
diff --git 
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/util/ECResourceInfoUtils.java
 
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/util/ECResourceInfoUtils.java
index 5fc852966..85c7470ce 100644
--- 
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/util/ECResourceInfoUtils.java
+++ 
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/util/ECResourceInfoUtils.java
@@ -125,6 +125,7 @@ public class ECResourceInfoUtils {
     engineNode.setTicketId(ecInfo.getTicketId());
     engineNode.setStartTime(ecInfo.getCreateTime());
     engineNode.setUpdateTime(ecInfo.getReleaseTime());
+    engineNode.setEcMetrics(ecInfo.getMetrics());
     return engineNode;
   }
 
diff --git 
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/utils/AMUtils.java
 
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/utils/AMUtils.java
index 43144e53f..660d39323 100644
--- 
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/utils/AMUtils.java
+++ 
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/utils/AMUtils.java
@@ -25,11 +25,13 @@ import 
org.apache.linkis.manager.common.entity.node.EngineNode;
 import org.apache.linkis.manager.common.entity.resource.*;
 import org.apache.linkis.manager.label.entity.Label;
 import org.apache.linkis.manager.label.entity.engine.EngineTypeLabel;
+import org.apache.linkis.rpc.Sender;
 
 import java.lang.reflect.Type;
 import java.text.SimpleDateFormat;
 import java.util.*;
 import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
@@ -41,6 +43,9 @@ import org.slf4j.LoggerFactory;
 public class AMUtils {
   private static final Logger logger = LoggerFactory.getLogger(AMUtils.class);
 
+  private static final AtomicInteger idCreator = new AtomicInteger();
+  private static String idPrefix = 
Sender.getThisServiceInstance().getInstance();
+
   private static Gson GSON =
       new GsonBuilder()
           .setDateFormat("yyyy-MM-dd'T'HH:mm:ssZ")
@@ -335,4 +340,8 @@ public class AMUtils {
       return false;
     }
   }
+
+  public static String getAsyncId() {
+    return idPrefix + "_" + idCreator.getAndIncrement();
+  }
 }
diff --git 
a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/constant/LabelKeyConstant.java
 
b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/constant/LabelKeyConstant.java
index bcb14a704..362932083 100644
--- 
a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/constant/LabelKeyConstant.java
+++ 
b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/constant/LabelKeyConstant.java
@@ -63,4 +63,6 @@ public class LabelKeyConstant {
   public static final String TENANT_KEY = "tenant";
 
   public static final String FIXED_EC_KEY = "fixedEngineConn";
+
+  public static final String MANAGER_KEY = "manager";
 }
diff --git 
a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/ManagerLabel.java
 
b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/ManagerLabel.java
new file mode 100644
index 000000000..674cc605a
--- /dev/null
+++ 
b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/ManagerLabel.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.manager.label.entity.engine;
+
+import org.apache.linkis.manager.label.constant.LabelKeyConstant;
+import org.apache.linkis.manager.label.entity.EngineNodeLabel;
+import org.apache.linkis.manager.label.entity.Feature;
+import org.apache.linkis.manager.label.entity.GenericLabel;
+import org.apache.linkis.manager.label.entity.annon.ValueSerialNum;
+
+import java.util.HashMap;
+
+public class ManagerLabel extends GenericLabel implements EngineNodeLabel {
+
+  public ManagerLabel() {
+    setLabelKey(LabelKeyConstant.MANAGER_KEY);
+  }
+
+  @Override
+  public Feature getFeature() {
+    return Feature.CORE;
+  }
+
+  public String getManager() {
+    if (null == getValue()) {
+      return null;
+    }
+    return getValue().get(getLabelKey());
+  }
+
+  @ValueSerialNum(0)
+  public void setManager(String manager) {
+    if (null == getValue()) {
+      setValue(new HashMap<>());
+    }
+    getValue().put(getLabelKey(), manager);
+  }
+}
diff --git 
a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/RunType.scala
 
b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/RunType.scala
index a24445269..21a067ed4 100644
--- 
a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/RunType.scala
+++ 
b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/RunType.scala
@@ -42,6 +42,7 @@ object RunType extends Enumeration {
   val ES_JSON = Value("esjson")
 
   val TRINO_SQL = Value("tsql")
+  val JSON = Value("json")
 
 
   val SEATUNNEL_ZETA = Value("szeta")
diff --git 
a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/constant/AMConstant.java
 
b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/constant/AMConstant.java
index 081e5e605..09d802a95 100644
--- 
a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/constant/AMConstant.java
+++ 
b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/constant/AMConstant.java
@@ -34,4 +34,30 @@ public class AMConstant {
   public static final String START_REASON = "start_reason";
 
   public static final String EC_CAN_RETRY = "ec_can_try";
+
+  public static final String EC_ASYNC_START_ID_KEY = "ecAsyncStartId";
+
+  public static final String EC_ASYNC_START_MANAGER_INSTANCE_KEY = 
"managerInstance";
+
+  /*
+  result : starting,success,failed
+   */
+  public static final String EC_ASYNC_START_RESULT_KEY = "ecAsyncStartResult";
+
+  /*
+  default false
+   */
+  public static final String EC_SYNC_START_KEY = "ecSyncStart";
+
+  public static final String EC_ASYNC_START_RESULT_SUCCESS = "success";
+
+  public static final String EC_ASYNC_START_RESULT_FAIL = "failed";
+
+  public static final String EC_ASYNC_START_RESULT_STARTING = "starting";
+
+  public static final String EC_ASYNC_START_FAIL_RETRY_KEY = "canRetry";
+
+  public static final String EC_ASYNC_START_FAIL_MSG_KEY = "failMsg";
+
+  public static final String EC_METRICS_KEY = "ecMetrics";
 }
diff --git 
a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/AMEngineNode.java
 
b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/AMEngineNode.java
index 7343aab2d..a560eec33 100644
--- 
a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/AMEngineNode.java
+++ 
b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/AMEngineNode.java
@@ -62,6 +62,8 @@ public class AMEngineNode implements EngineNode, 
ScoreServiceInstance {
 
   private String ticketId;
 
+  private String ecMetrics;
+
   public AMEngineNode() {}
 
   public AMEngineNode(double score, ServiceInstance serviceInstance) {
@@ -210,6 +212,16 @@ public class AMEngineNode implements EngineNode, 
ScoreServiceInstance {
     this.ticketId = ticketId;
   }
 
+  @Override
+  public String getEcMetrics() {
+    return ecMetrics;
+  }
+
+  @Override
+  public void setEcMetrics(String metrics) {
+    this.ecMetrics = metrics;
+  }
+
   @Override
   public Date getUpdateTime() {
     return updateTime;
diff --git 
a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/EngineNode.java
 
b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/EngineNode.java
index e3b8548bf..627b41bc5 100644
--- 
a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/EngineNode.java
+++ 
b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/EngineNode.java
@@ -26,4 +26,12 @@ public interface EngineNode extends AMNode, RMNode, 
LabelNode {
   String getLock();
 
   void setLock(String lock);
+
+  String getTicketId();
+
+  void setTicketId(String ticketId);
+
+  String getEcMetrics();
+
+  void setEcMetrics(String metrics);
 }
diff --git 
a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/persistence/PersistencerEcNodeInfo.java
 
b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/persistence/PersistencerEcNodeInfo.java
index 8637887e5..a3e02f021 100644
--- 
a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/persistence/PersistencerEcNodeInfo.java
+++ 
b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/persistence/PersistencerEcNodeInfo.java
@@ -58,7 +58,10 @@ public class PersistencerEcNodeInfo extends PersistenceNode {
         + ", engineType='"
         + engineType
         + '\''
-        + "} "
+        + ", heartbeatMsg='"
+        + heartbeatMsg
+        + '\''
+        + '}'
         + super.toString();
   }
 }
diff --git 
a/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/operator/TriggerSavepointOperator.java
 
b/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/operator/TriggerSavepointOperator.java
deleted file mode 100644
index ddf4838cd..000000000
--- 
a/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/operator/TriggerSavepointOperator.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.linkis.engineconnplugin.flink.operator;
-
-import org.apache.linkis.common.exception.WarnException;
-import 
org.apache.linkis.engineconn.once.executor.creation.OnceExecutorManager$;
-import 
org.apache.linkis.engineconnplugin.flink.client.deployment.ClusterDescriptorAdapter;
-import 
org.apache.linkis.engineconnplugin.flink.errorcode.FlinkErrorCodeSummary;
-import 
org.apache.linkis.engineconnplugin.flink.exception.JobExecutionException;
-import org.apache.linkis.engineconnplugin.flink.executor.FlinkOnceExecutor;
-import org.apache.linkis.manager.common.operator.Operator;
-
-import java.text.MessageFormat;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TriggerSavepointOperator implements Operator {
-
-  private static final Logger logger = 
LoggerFactory.getLogger(TriggerSavepointOperator.class);
-
-  @Override
-  public String[] getNames() {
-    return new String[] {"doSavepoint"};
-  }
-
-  @Override
-  public Map<String, Object> apply(Map<String, Object> parameters) {
-    String savepoint = getAsThrow(parameters, "savepointPath");
-    String mode = getAsThrow(parameters, "mode");
-    logger.info("try to " + mode + " savepoint with path " + savepoint + ".");
-
-    if (OnceExecutorManager$.MODULE$.getInstance().getReportExecutor()
-        instanceof FlinkOnceExecutor) {
-      FlinkOnceExecutor flinkExecutor =
-          (FlinkOnceExecutor) 
OnceExecutorManager$.MODULE$.getInstance().getReportExecutor();
-      ClusterDescriptorAdapter clusterDescriptorAdapter =
-          (ClusterDescriptorAdapter) 
flinkExecutor.getClusterDescriptorAdapter();
-      String writtenSavepoint = "";
-      try {
-        writtenSavepoint = clusterDescriptorAdapter.doSavepoint(savepoint, 
mode);
-      } catch (JobExecutionException e) {
-        logger.info("doSavepoint failed", e);
-        throw new RuntimeException(e);
-      }
-
-      Map<String, Object> stringMap = new HashMap<>();
-      stringMap.put("writtenSavepoint", writtenSavepoint);
-      return stringMap;
-    } else {
-      throw new WarnException(
-          FlinkErrorCodeSummary.NOT_SUPPORT_SAVEPOTION.getErrorCode(),
-          MessageFormat.format(
-              FlinkErrorCodeSummary.NOT_SUPPORT_SAVEPOTION.getErrorDesc(),
-              OnceExecutorManager$.MODULE$
-                  .getInstance()
-                  .getReportExecutor()
-                  .getClass()
-                  .getSimpleName()));
-    }
-  }
-
-  public <T> T getAsThrow(Map<String, Object> parameters, String key) {
-    Object value = parameters.get(key);
-    if (value != null) {
-      try {
-        return (T) value;
-      } catch (Exception e) {
-        throw new IllegalArgumentException("parameter " + key + " is 
invalid.", e);
-      }
-    } else {
-      throw new IllegalArgumentException("parameter " + key + " is required.");
-    }
-  }
-}
diff --git 
a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala
 
b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala
index 5f93b690e..8115a128c 100644
--- 
a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala
+++ 
b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala
@@ -132,6 +132,9 @@ object FlinkEnvConfiguration {
   val FLINK_ONCE_APP_STATUS_FETCH_INTERVAL =
     CommonVars("flink.app.fetch.status.interval", new TimeType("5s"))
 
+  val FLINK_ONCE_JAR_APP_REPORT_APPLICATIONID_INTERVAL =
+    CommonVars("flink.app.report.appid.interval", new TimeType("60s"))
+
   val FLINK_ONCE_APP_STATUS_FETCH_FAILED_MAX = 
CommonVars("flink.app.fetch.status.failed.num", 3)
 
   val FLINK_REPORTER_ENABLE = CommonVars("linkis.flink.reporter.enable", false)
@@ -153,4 +156,18 @@ object FlinkEnvConfiguration {
   val FLINK_PARAMS_BLANK_PLACEHOLER =
     CommonVars("linkis.flink.params.placeholder.blank", "\u0001")
 
+  val FLINK_MANAGER_MODE_CONFIG_KEY = 
CommonVars("linkis.flink.manager.mode.on", false)
+
+  val FLINK_MANAGER_LOAD_TASK_MAX = 
CommonVars("linkis.flink.manager.load.task.max", 50)
+
+  val HADOOP_CONF_DIR = CommonVars("linkis.flink.hadoop.conf.dir", 
System.getenv("HADOOP_CONF_DIR"))
+
+  val FLINK_MANAGER_CLIENT_MAX_NUM = CommonVars("linkis.flink.client.num.max", 
200)
+
+  val FLINK_MANAGER_CLIENT_EXPIRE_MILLS =
+    CommonVars("linkis.flink.client.expire.mills", 3600 * 1000)
+
+  val FLINK_HANDSHAKE_WAIT_TIME_MILLS =
+    CommonVars("linkis.flink.handshake.wait.time.mills", 60 * 1000)
+
 }
diff --git 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/conf/AccessibleExecutorSpringConfiguration.scala
 
b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkSrpingConfiguration.scala
similarity index 56%
copy from 
linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/conf/AccessibleExecutorSpringConfiguration.scala
copy to 
linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkSrpingConfiguration.scala
index 53cdd44b0..e0519e548 100644
--- 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/conf/AccessibleExecutorSpringConfiguration.scala
+++ 
b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkSrpingConfiguration.scala
@@ -15,25 +15,22 @@
  * limitations under the License.
  */
 
-package org.apache.linkis.engineconn.acessible.executor.conf
+package org.apache.linkis.engineconnplugin.flink.config
 
-import org.apache.linkis.common.utils.Logging
-import org.apache.linkis.engineconn.acessible.executor.info.{
-  DefaultNodeOverLoadInfoManager,
-  NodeOverLoadInfoManager
-}
+import org.apache.linkis.common.utils.{Logging, Utils}
 import org.apache.linkis.engineconn.acessible.executor.service.{
   EngineConnConcurrentLockService,
   EngineConnTimedLockService,
   LockService
 }
 import 
org.apache.linkis.engineconn.executor.listener.ExecutorListenerBusContext
+import org.apache.linkis.engineconnplugin.flink.util.ManagerUtil
 
 import 
org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean
 import org.springframework.context.annotation.{Bean, Configuration}
 
 @Configuration
-class AccessibleExecutorSpringConfiguration extends Logging {
+class FlinkSrpingConfiguration extends Logging {
 
   private val asyncListenerBusContext =
     
ExecutorListenerBusContext.getExecutorListenerBusContext().getEngineConnAsyncListenerBus
@@ -43,18 +40,41 @@ class AccessibleExecutorSpringConfiguration extends Logging 
{
   def createLockManager(): LockService = {
 
     val lockService =
-      if (AccessibleExecutorConfiguration.ENGINECONN_SUPPORT_PARALLELISM) {
+      if (ManagerUtil.isManager) {
+        logger.info("Engine is manager, supports parallelism.")
         new EngineConnConcurrentLockService
-      } else new EngineConnTimedLockService
+      } else {
+        logger.info("Engine is not manager, doesn't support parallelism.")
+        new EngineConnTimedLockService
+      }
     asyncListenerBusContext.addListener(lockService)
+    FlinkLockerServiceHolder.registerLockService(lockService)
     lockService
   }
 
-  @Bean
-  @ConditionalOnMissingBean(Array(classOf[NodeOverLoadInfoManager]))
-  def createNodeOverLoadInfoManager(): NodeOverLoadInfoManager = {
+}
+
+object FlinkLockerServiceHolder extends Logging {
 
-    new DefaultNodeOverLoadInfoManager
+  private var lockService: LockService = _
+
+  def registerLockService(service: LockService): Unit = {
+    Utils.tryAndError {
+      if (null != service) {
+        if (null == lockService) {
+          logger.info(s"Will register lockService : 
${service.getClass.getName}")
+          lockService = service
+        } else {
+          logger.warn(
+            s"Default lockService has been registered to 
${lockService.getClass.getName}, will not register : 
${service.getClass.getName}"
+          )
+        }
+      } else {
+        logger.warn("Cannot register null lockService")
+      }
+    }
   }
 
+  def getDefaultLockService(): LockService = lockService
+
 }
diff --git 
a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/EngineNode.java
 
b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/constants/FlinkECConstant.scala
similarity index 65%
copy from 
linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/EngineNode.java
copy to 
linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/constants/FlinkECConstant.scala
index e3b8548bf..6b3a0d356 100644
--- 
a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/EngineNode.java
+++ 
b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/constants/FlinkECConstant.scala
@@ -15,15 +15,23 @@
  * limitations under the License.
  */
 
-package org.apache.linkis.manager.common.entity.node;
+package org.apache.linkis.engineconnplugin.flink.constants
 
-public interface EngineNode extends AMNode, RMNode, LabelNode {
+object FlinkECConstant {
 
-  EMNode getEMNode();
+  val FLINK_MANAGER_OPERATION_TYPE_KEY = "operationType"
 
-  void setEMNode(EMNode emNode);
+  val FLINK_OPERATION_BOUNDARY_KEY = "operationBoundary"
 
-  String getLock();
+  val EC_INSTANCE_KEY = "ecInstance"
 
-  void setLock(String lock);
+  val MSG_KEY = "msg"
+
+  val SNAPHOT_KEY = "snapshot"
+
+  val SAVAPOINT_PATH_KEY = "savepointPath"
+
+  val SAVEPOINT_MODE_KEY = "mode"
+
+  val RESULT_SAVEPOINT_KEY = "writtenSavepoint"
 }
diff --git 
a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/context/EnvironmentContext.scala
 
b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/context/EnvironmentContext.scala
index 40bc732e5..eac436851 100644
--- 
a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/context/EnvironmentContext.scala
+++ 
b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/context/EnvironmentContext.scala
@@ -52,6 +52,8 @@ class EnvironmentContext(
 
   private var deploymentTarget: String = YarnDeploymentTarget.PER_JOB.getName
 
+  private var extraParams: util.Map[String, Any] = _
+
   def this(
       defaultEnv: Environment,
       systemConfiguration: Configuration,
@@ -63,7 +65,8 @@ class EnvironmentContext(
       providedLibDirsArray: Array[String],
       shipDirsArray: Array[String],
       dependencies: util.List[URL],
-      flinkExecutionTarget: String
+      flinkExecutionTarget: String,
+      extraParams: util.Map[String, Any]
   ) {
     this(
       defaultEnv,
@@ -89,6 +92,8 @@ class EnvironmentContext(
       this.flinkConfig.set(LinkisYarnClusterClientFactory.YARN_CONFIG_DIR, 
this.yarnConfDir)
       // set flink dist-jar(设置 flink dist jar)
       this.flinkConfig.set(YarnConfigOptions.FLINK_DIST_JAR, distJarPath)
+      // other params
+      this.extraParams = extraParams
     }
   }
 
@@ -114,6 +119,13 @@ class EnvironmentContext(
 
   def getDependencies: util.List[URL] = dependencies
 
+  def setExtraParams(params: util.Map[String, Any]): EnvironmentContext = {
+    this.extraParams = params
+    this
+  }
+
+  def getExtraParams(): util.Map[String, Any] = extraParams
+
   override def equals(o: Any): Boolean = o match {
     case context: EnvironmentContext =>
       if (this eq context) return true
diff --git 
a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkJarOnceExecutor.scala
 
b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkJarOnceExecutor.scala
index 8e2da4fbe..659cfcbc2 100644
--- 
a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkJarOnceExecutor.scala
+++ 
b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkJarOnceExecutor.scala
@@ -18,13 +18,27 @@
 package org.apache.linkis.engineconnplugin.flink.executor
 
 import org.apache.linkis.common.utils.Utils
+import 
org.apache.linkis.engineconn.acessible.executor.service.ExecutorHeartbeatServiceHolder
+import org.apache.linkis.engineconn.executor.service.ManagerService
 import org.apache.linkis.engineconn.once.executor.OnceExecutorExecutionContext
-import 
org.apache.linkis.engineconnplugin.flink.client.deployment.AbstractApplicationClusterDescriptorAdapter
+import org.apache.linkis.engineconnplugin.flink.client.deployment.{
+  AbstractApplicationClusterDescriptorAdapter,
+  YarnApplicationClusterDescriptorAdapter
+}
+import org.apache.linkis.engineconnplugin.flink.config.FlinkEnvConfiguration
 import org.apache.linkis.engineconnplugin.flink.config.FlinkEnvConfiguration._
 import org.apache.linkis.engineconnplugin.flink.context.FlinkEngineConnContext
+import org.apache.linkis.engineconnplugin.flink.operator.StatusOperator
+import org.apache.linkis.engineconnplugin.flink.util.YarnUtil
+import org.apache.linkis.governance.common.conf.GovernanceCommonConf
+import org.apache.linkis.governance.common.constant.ec.ECConstants
+import org.apache.linkis.manager.common.entity.enumeration.NodeStatus
 
 import org.apache.commons.lang3.StringUtils
 
+import java.util
+import java.util.concurrent.{Future, TimeUnit}
+
 import scala.concurrent.duration.Duration
 
 class FlinkJarOnceExecutor(
@@ -32,6 +46,10 @@ class FlinkJarOnceExecutor(
     override protected val flinkEngineConnContext: FlinkEngineConnContext
 ) extends FlinkOnceExecutor[AbstractApplicationClusterDescriptorAdapter] {
 
+  private var daemonThread: Future[_] = _
+
+  private var firstReportAppIdTimestampMills: Long = 0L
+
   override def doSubmit(
       onceExecutorExecutionContext: OnceExecutorExecutionContext,
       options: Map[String, String]
@@ -48,6 +66,73 @@ class FlinkJarOnceExecutor(
     Utils.waitUntil(() => clusterDescriptor.initJobId(), Duration.Inf)
     setJobID(clusterDescriptor.getJobId.toHexString)
     super.waitToRunning()
+    if 
(YarnUtil.isDetach(flinkEngineConnContext.getEnvironmentContext.getExtraParams()))
 {
+      waitToExit()
+    }
+  }
+
+  override def close(): Unit = {
+    super.close()
+    if (null != daemonThread) {
+      daemonThread.cancel(true)
+    }
+  }
+
+  override protected def closeYarnApp(): Unit = {
+    if 
(YarnUtil.isDetach(flinkEngineConnContext.getEnvironmentContext.getExtraParams()))
 {
+      if (getStatus == NodeStatus.Failed) {
+        logger.info("Will kill yarn app on close with clientType : detach, 
because status failed.")
+        super.closeYarnApp()
+      } else {
+        logger.info("Skip to kill yarn app on close with clientType : detach.")
+      }
+    } else {
+      logger.info("Will kill yarn app on close with clientType : attach.")
+      super.closeYarnApp()
+    }
+  }
+
+  private def waitToExit(): Unit = {
+    // upload applicationId to manager and then exit
+    val thisExecutor = this
+    if (!isCompleted) {
+      daemonThread = Utils.defaultScheduler.scheduleWithFixedDelay(
+        new Runnable {
+          override def run(): Unit = {
+            if (!isCompleted) {
+              Utils.waitUntil(() => StringUtils.isNotBlank(getApplicationId), 
Duration.apply("10s"))
+              if (StringUtils.isNotBlank(getApplicationId)) {
+                Utils.tryAndWarn {
+                  val heartbeatService = 
ExecutorHeartbeatServiceHolder.getDefaultHeartbeatService()
+                  if (null == heartbeatService) {
+                    logger.error("HeartbeatService not inited.")
+                    return null
+                  }
+                  val heartbeatMsg = 
heartbeatService.generateHeartBeatMsg(thisExecutor)
+                  
ManagerService.getManagerService.heartbeatReport(heartbeatMsg)
+                  logger.info(
+                    s"Succeed to report heatbeatMsg : 
${heartbeatMsg.getHeartBeatMsg}, will add handshake."
+                  )
+                  if (0L >= firstReportAppIdTimestampMills) {
+                    firstReportAppIdTimestampMills = System.currentTimeMillis()
+                  }
+                  if (!StatusOperator.isHandshaked) {
+                    StatusOperator.addHandshake()
+                  } else {
+                    logger.info("Will exit with handshaked.")
+                    trySucceed()
+                  }
+                }
+              }
+            }
+          }
+        },
+        1000,
+        
FlinkEnvConfiguration.FLINK_ONCE_JAR_APP_REPORT_APPLICATIONID_INTERVAL.getValue.toLong,
+        TimeUnit.MILLISECONDS
+      )
+      logger.info("waitToExit submited.")
+    }
   }
 
 }
diff --git 
a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkManagerConcurrentExecutor.scala
 
b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkManagerConcurrentExecutor.scala
new file mode 100644
index 000000000..b204c2405
--- /dev/null
+++ 
b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkManagerConcurrentExecutor.scala
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineconnplugin.flink.executor
+
+import org.apache.linkis.common.utils.Logging
+import 
org.apache.linkis.engineconn.acessible.executor.conf.AccessibleExecutorConfiguration
+import 
org.apache.linkis.engineconn.acessible.executor.service.EngineConnConcurrentLockService
+import org.apache.linkis.engineconn.common.creation.EngineCreationContext
+import org.apache.linkis.engineconn.computation.executor.execute.{
+  ComputationExecutor,
+  ConcurrentComputationExecutor,
+  EngineExecutionContext
+}
+import org.apache.linkis.engineconn.once.executor.OnceExecutorExecutionContext
+import 
org.apache.linkis.engineconnplugin.flink.client.deployment.ClusterDescriptorAdapter
+import org.apache.linkis.engineconnplugin.flink.config.FlinkLockerServiceHolder
+import org.apache.linkis.engineconnplugin.flink.context.FlinkEngineConnContext
+import org.apache.linkis.engineconnplugin.flink.errorcode.FlinkErrorCodeSummary
+import org.apache.linkis.engineconnplugin.flink.exception.JobExecutionException
+import org.apache.linkis.engineconnplugin.flink.util.ManagerUtil
+import org.apache.linkis.manager.common.entity.enumeration.NodeStatus
+import org.apache.linkis.protocol.engine.JobProgressInfo
+import org.apache.linkis.scheduler.executer.{
+  AsynReturnExecuteResponse,
+  ErrorExecuteResponse,
+  ExecuteResponse
+}
+
+class FlinkManagerConcurrentExecutor(
+    val id: Long,
+    maxRunningNumber: Int,
+    val flinkEngineConnContext: FlinkEngineConnContext
+) extends FlinkOnceExecutor[ClusterDescriptorAdapter]
+    with FlinkExecutor
+    with Logging {
+
+  override protected def submit(
+      onceExecutorExecutionContext: OnceExecutorExecutionContext
+  ): Unit = {
+    logger.info("Succeed to init FlinkManagerExecutor.")
+  }
+
+  override def execute(
+      onceExecutorExecutionContext: OnceExecutorExecutionContext
+  ): ExecuteResponse = {
+    val isManager = ManagerUtil.isManager
+    val lockService = FlinkLockerServiceHolder.getDefaultLockService()
+    if (
+        isManager && null != lockService && lockService
+          .isInstanceOf[EngineConnConcurrentLockService]
+    ) {
+      val msg = "Succeed to init FlinkManagerExecutor."
+      logger.info(msg)
+      new AsynReturnExecuteResponse {
+        override def notify(rs: ExecuteResponse => Unit): Unit = {
+          logger.info(s"FlinkManagerExecutor will skip listener : ${rs}")
+        }
+      }
+    } else {
+      ErrorExecuteResponse(
+        "FlinkManagerExecutor got default lockService is not instance of 
EngineConnConcurrentLockService, will shutdown.",
+        null
+      )
+    }
+  }
+
+  override def getId: String = id.toString
+
+  override def close(): Unit = {
+    logger.info(s"FlinkManagerExecutor : ${getId} will close.")
+    super.close()
+  }
+
+  def getMaxRunningNumber: Int = maxRunningNumber
+
+  def getFlinkContext(): FlinkEngineConnContext = flinkEngineConnContext
+
+  override def doSubmit(
+      onceExecutorExecutionContext: OnceExecutorExecutionContext,
+      options: Map[String, String]
+  ): Unit = submit(onceExecutorExecutionContext)
+
+  override protected def initOnceExecutorExecutionContext(
+      onceExecutorExecutionContext: OnceExecutorExecutionContext
+  ): Unit = {}
+
+  override protected def createOnceExecutorExecutionContext(
+      engineCreationContext: EngineCreationContext
+  ): OnceExecutorExecutionContext = new OnceExecutorExecutionContext(null, 
null)
+
+  override def tryReady(): Boolean = {
+    // set default status to Unlock
+    transition(NodeStatus.Unlock)
+    true
+  }
+
+}
diff --git 
a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkOnceExecutor.scala
 
b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkOnceExecutor.scala
index f3d15a7e9..c30e88584 100644
--- 
a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkOnceExecutor.scala
+++ 
b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkOnceExecutor.scala
@@ -120,9 +120,7 @@ trait FlinkOnceExecutor[T <: ClusterDescriptorAdapter]
     if (daemonThread != null) daemonThread.cancel(true)
   }
 
-  override def close(): Unit = {
-    super.close()
-    closeDaemon()
+  protected def closeYarnApp(): Unit = {
     if (clusterDescriptor != null) {
       clusterDescriptor.cancelJob()
       clusterDescriptor.close()
@@ -130,6 +128,12 @@ trait FlinkOnceExecutor[T <: ClusterDescriptorAdapter]
     flinkEngineConnContext.getExecutionContext.getClusterClientFactory.close()
   }
 
+  override def close(): Unit = {
+    super.close()
+    closeDaemon()
+    closeYarnApp()
+  }
+
   override protected def waitToRunning(): Unit = {
     if (!isCompleted) {
       daemonThread = Utils.defaultScheduler.scheduleAtFixedRate(
diff --git 
a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala
 
b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala
index e6cdcc5da..3d1b83447 100644
--- 
a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala
+++ 
b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala
@@ -17,8 +17,11 @@
 
 package org.apache.linkis.engineconnplugin.flink.factory
 
+import org.apache.linkis.common.conf.CommonVars
 import org.apache.linkis.common.utils.{ClassUtils, Logging}
+import 
org.apache.linkis.engineconn.acessible.executor.conf.AccessibleExecutorConfiguration
 import org.apache.linkis.engineconn.common.creation.EngineCreationContext
+import org.apache.linkis.engineconn.launch.EngineConnServer
 import org.apache.linkis.engineconnplugin.flink.client.config.Environment
 import 
org.apache.linkis.engineconnplugin.flink.client.config.entries.ExecutionEntry
 import org.apache.linkis.engineconnplugin.flink.client.context.ExecutionContext
@@ -32,7 +35,8 @@ import 
org.apache.linkis.engineconnplugin.flink.context.{EnvironmentContext, Fli
 import 
org.apache.linkis.engineconnplugin.flink.errorcode.FlinkErrorCodeSummary._
 import 
org.apache.linkis.engineconnplugin.flink.exception.FlinkInitFailedException
 import org.apache.linkis.engineconnplugin.flink.setting.Settings
-import org.apache.linkis.engineconnplugin.flink.util.ClassUtil
+import org.apache.linkis.engineconnplugin.flink.util.{ClassUtil, ManagerUtil}
+import org.apache.linkis.governance.common.conf.GovernanceCommonConf
 import org.apache.linkis.manager.engineplugin.common.conf.EnvConfiguration
 import org.apache.linkis.manager.engineplugin.common.creation.{
   ExecutorFactory,
@@ -41,6 +45,7 @@ import 
org.apache.linkis.manager.engineplugin.common.creation.{
 import org.apache.linkis.manager.label.entity.Label
 import org.apache.linkis.manager.label.entity.engine._
 import org.apache.linkis.manager.label.entity.engine.EngineType.EngineType
+import org.apache.linkis.protocol.utils.TaskUtils
 
 import org.apache.commons.lang3.StringUtils
 import org.apache.flink.configuration._
@@ -108,6 +113,19 @@ class FlinkEngineConnFactory extends 
MultiExecutorEngineConnFactory with Logging
     val providedLibDirsArray = 
FLINK_LIB_LOCAL_PATH.getValue(options).split(",")
     // Ship directories
     val shipDirsArray = getShipDirectories(options)
+    // other params
+    val flinkClientType = 
GovernanceCommonConf.EC_APP_MANAGE_MODE.getValue(options)
+    val otherParams = new util.HashMap[String, Any]()
+    val isManager = ManagerUtil.isManager
+    if (isManager) {
+//      logger.info(
+//        s"flink manager mode on. Will set 
${AccessibleExecutorConfiguration.ENGINECONN_SUPPORT_PARALLELISM.key} to true."
+//      )
+      logger.info(
+        s"support parallelism : 
${AccessibleExecutorConfiguration.ENGINECONN_SUPPORT_PARALLELISM.getHotValue()}"
+      )
+    }
+    otherParams.put(GovernanceCommonConf.EC_APP_MANAGE_MODE.key, 
flinkClientType.toLowerCase())
     val context = new EnvironmentContext(
       defaultEnv,
       new Configuration,
@@ -119,7 +137,8 @@ class FlinkEngineConnFactory extends 
MultiExecutorEngineConnFactory with Logging
       providedLibDirsArray,
       shipDirsArray,
       new util.ArrayList[URL],
-      flinkExecutionTarget
+      flinkExecutionTarget,
+      otherParams
     )
     // Step1: environment-level configurations
     val jobName = options.getOrDefault("flink.app.name", "EngineConn-Flink")
@@ -445,8 +464,14 @@ class FlinkEngineConnFactory extends 
MultiExecutorEngineConnFactory with Logging
   ): FlinkEngineConnContext =
     new FlinkEngineConnContext(environmentContext)
 
-  override protected def getDefaultExecutorFactoryClass: Class[_ <: 
ExecutorFactory] =
-    classOf[FlinkCodeExecutorFactory]
+  override protected def getDefaultExecutorFactoryClass: Class[_ <: 
ExecutorFactory] = {
+    val options = EngineConnServer.getEngineCreationContext.getOptions
+    if (FlinkEnvConfiguration.FLINK_MANAGER_MODE_CONFIG_KEY.getValue(options)) 
{
+      classOf[FlinkManagerExecutorFactory]
+    } else {
+      classOf[FlinkCodeExecutorFactory]
+    }
+  }
 
   override protected def getEngineConnType: EngineType = EngineType.FLINK
 
@@ -454,7 +479,8 @@ class FlinkEngineConnFactory extends 
MultiExecutorEngineConnFactory with Logging
     ClassUtil.getInstance(classOf[FlinkSQLExecutorFactory], new 
FlinkSQLExecutorFactory),
     ClassUtil
       .getInstance(classOf[FlinkApplicationExecutorFactory], new 
FlinkApplicationExecutorFactory),
-    ClassUtil.getInstance(classOf[FlinkCodeExecutorFactory], new 
FlinkCodeExecutorFactory)
+    ClassUtil.getInstance(classOf[FlinkCodeExecutorFactory], new 
FlinkCodeExecutorFactory),
+    ClassUtil.getInstance(classOf[FlinkManagerExecutorFactory], new 
FlinkManagerExecutorFactory)
   )
 
   override def getExecutorFactories: Array[ExecutorFactory] = 
executorFactoryArray
diff --git 
a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkManagerExecutorFactory.scala
 
b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkManagerExecutorFactory.scala
new file mode 100644
index 000000000..6bdf43297
--- /dev/null
+++ 
b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkManagerExecutorFactory.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineconnplugin.flink.factory
+
+import org.apache.linkis.engineconn.common.creation.EngineCreationContext
+import org.apache.linkis.engineconn.common.engineconn.EngineConn
+import 
org.apache.linkis.engineconn.computation.executor.creation.ComputationExecutorFactory
+import 
org.apache.linkis.engineconn.computation.executor.execute.ComputationExecutor
+import org.apache.linkis.engineconn.once.executor.OnceExecutor
+import org.apache.linkis.engineconn.once.executor.creation.OnceExecutorFactory
+import org.apache.linkis.engineconnplugin.flink.config.FlinkEnvConfiguration
+import org.apache.linkis.engineconnplugin.flink.context.FlinkEngineConnContext
+import org.apache.linkis.engineconnplugin.flink.executor.{
+  FlinkCodeOnceExecutor,
+  FlinkManagerConcurrentExecutor
+}
+import 
org.apache.linkis.engineconnplugin.flink.factory.FlinkManagerExecutorFactory.setDefaultExecutor
+import org.apache.linkis.manager.label.entity.Label
+import org.apache.linkis.manager.label.entity.engine.RunType._
+
+class FlinkManagerExecutorFactory extends OnceExecutorFactory {
+
+  override protected def newExecutor(
+      id: Int,
+      engineCreationContext: EngineCreationContext,
+      engineConn: EngineConn,
+      labels: Array[Label[_]]
+  ): OnceExecutor = engineConn.getEngineConnSession match {
+    case flinkEngineConnContext: FlinkEngineConnContext =>
+      val executor = new FlinkManagerConcurrentExecutor(
+        id,
+        FlinkEnvConfiguration.FLINK_MANAGER_LOAD_TASK_MAX.getValue,
+        flinkEngineConnContext
+      )
+      setDefaultExecutor(executor)
+      executor
+  }
+
+  // just set lots of runType, but now only sql is supported.
+  override protected def getSupportRunTypes: Array[String] =
+    Array(JSON.toString)
+
+  override protected def getRunType: RunType = JSON
+}
+
+object FlinkManagerExecutorFactory {
+
+  private var defaultExecutor: FlinkManagerConcurrentExecutor = _
+
+  def setDefaultExecutor(executor: FlinkManagerConcurrentExecutor): Unit = {
+    defaultExecutor = executor
+  }
+
+  def getDefaultExecutor(): FlinkManagerConcurrentExecutor = defaultExecutor
+
+}
diff --git 
a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/hook/EngineLoadOperationHook.scala
 
b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/hook/EngineLoadOperationHook.scala
new file mode 100644
index 000000000..78ae65307
--- /dev/null
+++ 
b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/hook/EngineLoadOperationHook.scala
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineconnplugin.flink.hook
+
+import org.apache.linkis.common.utils.Logging
+import 
org.apache.linkis.engineconn.acessible.executor.entity.AccessibleExecutor
+import org.apache.linkis.engineconn.acessible.executor.hook.OperationHook
+import org.apache.linkis.engineconn.core.executor.ExecutorManager
+import org.apache.linkis.engineconnplugin.flink.config.FlinkEnvConfiguration
+import 
org.apache.linkis.engineconnplugin.flink.factory.FlinkManagerExecutorFactory
+import org.apache.linkis.manager.common.entity.enumeration.NodeStatus
+import org.apache.linkis.manager.common.protocol.engine.{
+  EngineOperateRequest,
+  EngineOperateResponse
+}
+
+import org.springframework.stereotype.Service
+
+import javax.annotation.PostConstruct
+
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.mutable
+
+@Service
+class EngineLoadOperationHook extends OperationHook with Logging {
+
+  @PostConstruct
+  private def init(): Unit = {
+    OperationHook.registerOperationHook(this)
+    logger.info(s"${getName()} init success.")
+  }
+
+  private val taskNum = new AtomicInteger(0)
+  private val lock = new Object
+
+  override def getName(): String = getClass.getSimpleName
+
+  override def doPreOperation(
+      engineOperateRequest: EngineOperateRequest,
+      engineOperateResponse: EngineOperateResponse
+  ): Unit = {
+    ExecutorManager.getInstance.getReportExecutor match {
+      case accessibleExecutor: AccessibleExecutor =>
+        accessibleExecutor.updateLastActivityTime()
+      case _ =>
+    }
+    if (
+        taskNum.incrementAndGet() >= 
FlinkEnvConfiguration.FLINK_MANAGER_LOAD_TASK_MAX.getHotValue()
+    ) {
+      lock.synchronized {
+        if (
+            taskNum
+              .incrementAndGet() >= 
FlinkEnvConfiguration.FLINK_MANAGER_LOAD_TASK_MAX.getHotValue()
+        ) {
+          FlinkManagerExecutorFactory.getDefaultExecutor() match {
+            case accessibleExecutor: AccessibleExecutor =>
+              if (NodeStatus.Busy != accessibleExecutor.getStatus) {
+                accessibleExecutor.transition(NodeStatus.Busy)
+                logger.warn("The number of tasks exceeds the maximum limit, 
change status to busy.")
+              }
+            case _ => 
logger.error("FlinkManagerExecutorFactory.getDefaultExecutor() is None.")
+          }
+        }
+      }
+    }
+  }
+
+  override def doPostOperation(
+      engineOperateRequest: EngineOperateRequest,
+      engineOperateResponse: EngineOperateResponse
+  ): Unit = {
+    if (taskNum.get() - 1 < 
FlinkEnvConfiguration.FLINK_MANAGER_LOAD_TASK_MAX.getHotValue()) {
+      lock.synchronized {
+        if (
+            taskNum
+              .decrementAndGet() < 
FlinkEnvConfiguration.FLINK_MANAGER_LOAD_TASK_MAX.getHotValue()
+        ) {
+          FlinkManagerExecutorFactory.getDefaultExecutor() match {
+            case accessibleExecutor: AccessibleExecutor =>
+              if (NodeStatus.Busy == accessibleExecutor.getStatus) {
+                accessibleExecutor.transition(NodeStatus.Unlock)
+                logger.warn(
+                  "The number of tasks is less than the maximum limit, change 
status to unlock."
+                )
+              }
+            case _ => 
logger.error("FlinkManagerExecutorFactory.getDefaultExecutor() is None.")
+          }
+        }
+      }
+    }
+    if (logger.isDebugEnabled()) {
+      logger.debug(s"taskNum: ${taskNum.get()}")
+    }
+  }
+
+}
diff --git 
a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/operator/KillOperator.scala
 
b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/operator/KillOperator.scala
new file mode 100644
index 000000000..1a917526b
--- /dev/null
+++ 
b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/operator/KillOperator.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineconnplugin.flink.operator
+
+import org.apache.linkis.common.utils.{Logging, Utils}
+import org.apache.linkis.engineconnplugin.flink.constants.FlinkECConstant
+import 
org.apache.linkis.engineconnplugin.flink.operator.clientmanager.FlinkRestClientManager
+import org.apache.linkis.engineconnplugin.flink.util.YarnUtil
+import org.apache.linkis.engineconnplugin.flink.util.YarnUtil.logAndException
+import org.apache.linkis.governance.common.constant.ec.ECConstants
+import org.apache.linkis.governance.common.exception.GovernanceErrorException
+import org.apache.linkis.manager.common.operator.Operator
+import org.apache.linkis.server.toScalaMap
+
+import org.apache.hadoop.yarn.api.records.{ApplicationId, 
FinalApplicationStatus}
+
+import java.util
+
+import scala.collection.JavaConverters.collectionAsScalaIterableConverter
+import scala.collection.mutable
+
+class KillOperator extends Operator with Logging {
+
+  override def getNames: Array[String] = Array("kill")
+
+  @throws[GovernanceErrorException]
+  override def apply(params: util.Map[String, Object]): util.Map[String, 
Object] = {
+
+    val rsMap = new mutable.HashMap[String, String]
+    val appIdStr = params.getOrElse(ECConstants.YARN_APPID_NAME_KEY, 
"").asInstanceOf[String]
+    val snapShot = params.getOrElse(FlinkECConstant.SNAPHOT_KEY, 
"false").toString.toBoolean
+
+    val appId: ApplicationId = YarnUtil.retrieveApplicationId(appIdStr)
+
+    var isStopped = false
+    val restClient =
+      Utils.tryCatch {
+        FlinkRestClientManager.getFlinkRestClient(appIdStr)
+      } { case e: Exception =>
+        val yarnClient = YarnUtil.getYarnClient()
+        val appReport = yarnClient.getApplicationReport(appId)
+        if (appReport.getFinalApplicationStatus != 
FinalApplicationStatus.UNDEFINED) {
+          // Flink cluster is not running anymore
+          val msg =
+            s"The application ${appIdStr} doesn't run anymore. It has 
previously completed with final status: 
${appReport.getFinalApplicationStatus.toString}"
+          logAndException(msg)
+          isStopped = true
+          null
+        } else {
+          val msg = s"Get client for app ${appIdStr} failed, because : 
${e.getMessage}"
+          throw logAndException(msg)
+        }
+      }
+    if (!isStopped) {
+      if (snapShot) {
+        val checkPointPath =
+          params.getOrElse(FlinkECConstant.SAVAPOINT_PATH_KEY, 
null).asInstanceOf[String]
+        val rs = YarnUtil.triggerSavepoint(appIdStr, checkPointPath, 
restClient)
+        rsMap.put(FlinkECConstant.MSG_KEY, rs)
+      }
+      val jobs = restClient.listJobs().get()
+      if (null == jobs || jobs.isEmpty) {
+        val msg = s"App : ${appIdStr} have no jobs, but is not ended."
+        throw logAndException(msg)
+      }
+      val msg = s"Try to kill ${jobs.size()} jobs of app : ${appIdStr}"
+      jobs.asScala.foreach(job => restClient.cancel(job.getJobId))
+      rsMap += (FlinkECConstant.MSG_KEY -> msg)
+    }
+
+    rsMap.toMap[String, String]
+    val map = new util.HashMap[String, Object]()
+    rsMap.foreach(entry => map.put(entry._1, entry._2))
+    map
+  }
+
+}
diff --git 
a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/operator/ListOperator.scala
 
b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/operator/ListOperator.scala
new file mode 100644
index 000000000..1497763de
--- /dev/null
+++ 
b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/operator/ListOperator.scala
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineconnplugin.flink.operator
+
+import org.apache.linkis.common.utils.{Logging, Utils}
+import org.apache.linkis.engineconn.common.exception.EngineConnException
+import org.apache.linkis.engineconnplugin.flink.util.YarnUtil
+import org.apache.linkis.governance.common.constant.ec.ECConstants
+import org.apache.linkis.governance.common.exception.GovernanceErrorException
+import 
org.apache.linkis.governance.common.exception.engineconn.EngineConnExecutorErrorCode
+import org.apache.linkis.manager.common.operator.Operator
+import org.apache.linkis.server.{toScalaBuffer, toScalaMap, 
BDPJettyServerHelper}
+
+import org.apache.hadoop.yarn.api.records.{FinalApplicationStatus, 
YarnApplicationState}
+
+import java.util
+
+import scala.collection.mutable
+
+class ListOperator extends Operator with Logging {
+
+  private val json = BDPJettyServerHelper.jacksonJson
+
+  override def getNames: Array[String] = Array("list")
+
+  @throws[GovernanceErrorException]
+  override def apply(params: util.Map[String, Object]): util.Map[String, 
Object] = {
+
+    val applicationTypeSet = new util.HashSet[String]()
+    var appStateSet = 
util.EnumSet.of[YarnApplicationState](YarnApplicationState.RUNNING)
+    var appName = ""
+
+    Utils.tryCatch {
+      val appTypeList = params
+        .getOrElse(ECConstants.YARN_APP_TYPE_LIST_KEY, new 
util.ArrayList[String]())
+        .asInstanceOf[util.List[String]]
+      appTypeList.foreach(applicationTypeSet.add)
+      val appStateList = params
+        .getOrElse(ECConstants.YARN_APP_STATE_LIST_KEY, new 
util.ArrayList[String]())
+        .asInstanceOf[util.List[String]]
+      val appStateArray = new util.HashSet[YarnApplicationState]
+      appStateList.foreach(e => 
appStateArray.add(YarnApplicationState.valueOf(e)))
+      if (!appStateArray.isEmpty) {
+        appStateSet = util.EnumSet.copyOf(appStateArray)
+      }
+      appName = params.getOrElse(ECConstants.YARN_APP_NAME_KEY, 
"").asInstanceOf[String]
+    } { e: Throwable =>
+      val msg = "Invalid params. " + e.getMessage
+      logger.error(msg, e)
+      throw new 
EngineConnException(EngineConnExecutorErrorCode.INVALID_PARAMS, msg)
+    }
+
+    val yarnClient = YarnUtil.getYarnClient()
+    val appList = yarnClient.getApplications(applicationTypeSet, appStateSet)
+    val rsMap = new mutable.HashMap[String, String]
+    Utils.tryCatch {
+      val appTypeStr = json.writeValueAsString(applicationTypeSet)
+      val appStateStr = json.writeValueAsString(appStateSet)
+      val rsAppList = new util.ArrayList[util.Map[String, String]]()
+      appList.foreach(report => {
+        if (report.getName.contains(appName)) {
+          val tmpMap = new util.HashMap[String, String]()
+          tmpMap.put(ECConstants.YARN_APP_NAME_KEY, report.getName)
+          tmpMap.put(ECConstants.YARN_APP_TYPE_KEY, report.getApplicationType)
+          tmpMap.put(ECConstants.YARN_APPID_NAME_KEY, 
report.getApplicationId.toString)
+          tmpMap.put(ECConstants.YARN_APP_URL_KEY, report.getTrackingUrl)
+          val appStatus =
+            if (report.getFinalApplicationStatus != 
FinalApplicationStatus.UNDEFINED) {
+              report.getFinalApplicationStatus
+            } else {
+              report.getYarnApplicationState
+            }
+          tmpMap.put(
+            ECConstants.NODE_STATUS_KEY,
+            YarnUtil
+              .convertYarnStateToNodeStatus(report.getApplicationId.toString, 
appStatus.toString)
+              .toString
+          )
+          rsAppList.add(tmpMap)
+        }
+      })
+      val listStr = json.writeValueAsString(rsAppList)
+
+      logger.info(
+        s"List yarn apps, params : appTypeSet : ${appTypeStr}, appStateSet : 
${appStateStr}, list : ${listStr}"
+      )
+
+      rsMap += (ECConstants.YARN_APP_RESULT_LIST_KEY -> listStr)
+    } { case e: Exception =>
+      val msg = "convert listStr failed. Because : " + e.getMessage
+      logger.error(msg)
+      throw e
+    }
+
+    val map = new util.HashMap[String, Object]()
+    rsMap.foreach(e => map.put(e._1, e._2))
+    map
+  }
+
+}
diff --git 
a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/operator/StatusOperator.scala
 
b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/operator/StatusOperator.scala
new file mode 100644
index 000000000..05f1200bb
--- /dev/null
+++ 
b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/operator/StatusOperator.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineconnplugin.flink.operator
+
+import org.apache.linkis.common.exception.{LinkisException, 
LinkisRuntimeException}
+import org.apache.linkis.common.utils.{Logging, Utils}
+import org.apache.linkis.engineconn.common.exception.EngineConnException
+import org.apache.linkis.engineconnplugin.flink.util.{ManagerUtil, YarnUtil}
+import org.apache.linkis.engineconnplugin.flink.util.YarnUtil.logAndException
+import org.apache.linkis.governance.common.constant.ec.ECConstants
+import 
org.apache.linkis.governance.common.exception.engineconn.EngineConnExecutorErrorCode
+import org.apache.linkis.manager.common.entity.enumeration.NodeStatus
+import org.apache.linkis.manager.common.operator.Operator
+import org.apache.linkis.server.toScalaMap
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.hadoop.yarn.api.records.{ApplicationId, ApplicationReport, 
FinalApplicationStatus}
+import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException
+
+import java.util
+
+import scala.collection.mutable
+
+class StatusOperator extends Operator with Logging {
+
+  override def getNames: Array[String] = Array("status")
+
+  override def apply(params: util.Map[String, Object]): util.Map[String, 
Object] = {
+
+    val appIdStr = params.getOrElse(ECConstants.YARN_APPID_NAME_KEY, 
"").asInstanceOf[String]
+
+    val parts = appIdStr.split("_")
+    val clusterTimestamp = parts(1).toLong
+    val sequenceNumber = parts(2).toInt
+
+    // Create an ApplicationId object using newInstance method
+    val appId = ApplicationId.newInstance(clusterTimestamp, sequenceNumber)
+    val rsMap = new mutable.HashMap[String, String]
+
+    val yarnClient = YarnUtil.getYarnClient()
+    var appReport: ApplicationReport = null
+    Utils.tryCatch {
+      appReport = yarnClient.getApplicationReport(appId)
+      if (null == appReport) {
+        throw logAndException(s"Got null appReport for appid : ${appIdStr}")
+      }
+    } { case notExist: ApplicationNotFoundException =>
+      logger.error(s"Application : ${appIdStr} not exists, will set the status 
to failed.")
+      val map = new util.HashMap[String, Object]()
+      map.put(ECConstants.NODE_STATUS_KEY, NodeStatus.Failed.toString)
+      map.put(ECConstants.YARN_APPID_NAME_KEY, appIdStr)
+      return map
+    }
+
+    // Get the application status (YarnApplicationState)
+    val appStatus = if (appReport.getFinalApplicationStatus != 
FinalApplicationStatus.UNDEFINED) {
+      appReport.getFinalApplicationStatus
+    } else {
+      appReport.getYarnApplicationState
+    }
+
+    val nodeStatus: NodeStatus = 
YarnUtil.convertYarnStateToNodeStatus(appIdStr, appStatus.toString)
+
+    logger.info(s"try to get appid: ${appIdStr}, status 
${nodeStatus.toString}.")
+    rsMap += (ECConstants.NODE_STATUS_KEY -> nodeStatus.toString)
+    rsMap += (ECConstants.YARN_APPID_NAME_KEY -> appIdStr)
+    val map = new util.HashMap[String, Object]()
+    rsMap.foreach(entry => map.put(entry._1, entry._2))
+    map
+  }
+
+}
+
+object StatusOperator extends Logging {
+
+  private var handshaked: Boolean = false
+
+  def addHandshake(): Unit = {
+    handshaked = true
+  }
+
+  def isHandshaked: Boolean = handshaked
+
+}
diff --git 
a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/operator/TriggerSavepointOperator.scala
 
b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/operator/TriggerSavepointOperator.scala
new file mode 100644
index 000000000..ceddc367c
--- /dev/null
+++ 
b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/operator/TriggerSavepointOperator.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineconnplugin.flink.operator
+
+import org.apache.linkis.common.utils.Logging
+import org.apache.linkis.engineconn.launch.EngineConnServer
+import org.apache.linkis.engineconn.once.executor.creation.OnceExecutorManager
+import org.apache.linkis.engineconnplugin.flink.constants.FlinkECConstant
+import 
org.apache.linkis.engineconnplugin.flink.errorcode.FlinkErrorCodeSummary._
+import org.apache.linkis.engineconnplugin.flink.exception.JobExecutionException
+import org.apache.linkis.engineconnplugin.flink.executor.FlinkOnceExecutor
+import 
org.apache.linkis.engineconnplugin.flink.operator.clientmanager.FlinkRestClientManager
+import org.apache.linkis.engineconnplugin.flink.util.YarnUtil
+import org.apache.linkis.engineconnplugin.flink.util.YarnUtil.logAndException
+import org.apache.linkis.governance.common.constant.ec.ECConstants
+import org.apache.linkis.governance.common.exception.GovernanceErrorException
+import org.apache.linkis.manager.common.operator.Operator
+
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus
+
+import java.text.MessageFormat
+import java.util
+
+import scala.collection.mutable
+
+class TriggerSavepointOperator extends Operator with Logging {
+
+  override def getNames: Array[String] = Array("doSavepoint")
+
+  @throws[GovernanceErrorException]
+  override def apply(params: util.Map[String, Object]): util.Map[String, 
Object] = {
+    val rsMap = new mutable.HashMap[String, String]
+
+    val savepointPath = getAsThrow[String](params, 
FlinkECConstant.SAVAPOINT_PATH_KEY)
+    val appIdStr = getAsThrow[String](params, ECConstants.YARN_APPID_NAME_KEY)
+    val mode = getAsThrow[String](params, FlinkECConstant.SAVEPOINT_MODE_KEY)
+
+    val appId = YarnUtil.retrieveApplicationId(appIdStr)
+    val yarnClient = YarnUtil.getYarnClient()
+    val appReport = yarnClient.getApplicationReport(appId)
+    if (appReport.getFinalApplicationStatus != 
FinalApplicationStatus.UNDEFINED) {
+      // Flink cluster is not running anymore
+      val msg =
+        s"The application ${appIdStr} doesn't run anymore. It has previously 
completed with final status: ${appReport.getFinalApplicationStatus.toString}"
+      throw logAndException(msg)
+    }
+
+    logger.info(s"try to $mode savepoint with path $savepointPath.")
+    if (
+        YarnUtil.isDetach(
+          
EngineConnServer.getEngineCreationContext.getOptions.asInstanceOf[util.Map[String,
 Any]]
+        )
+    ) {
+      logger.info("The flink cluster is detached, use rest api to trigger 
savepoint.")
+      val restClient = FlinkRestClientManager.getFlinkRestClient(appIdStr)
+      val rs = YarnUtil.triggerSavepoint(appIdStr, savepointPath, restClient)
+      rsMap.put(FlinkECConstant.RESULT_SAVEPOINT_KEY, rs)
+    } else {
+      logger.info("The flink cluster is not detached, use flink client to 
trigger savepoint.")
+      OnceExecutorManager.getInstance.getReportExecutor match {
+        case flinkExecutor: FlinkOnceExecutor[_] =>
+          val writtenSavepoint =
+            
flinkExecutor.getClusterDescriptorAdapter.doSavepoint(savepointPath, mode)
+          rsMap.put(FlinkECConstant.RESULT_SAVEPOINT_KEY, writtenSavepoint)
+        case executor =>
+          throw new JobExecutionException(
+            NOT_SUPPORT_SAVEPOTION.getErrorDesc + 
executor.getClass.getSimpleName
+              + MessageFormat
+                .format(NOT_SUPPORT_SAVEPOTION.getErrorDesc, 
executor.getClass.getSimpleName)
+          )
+      }
+    }
+    val map = new util.HashMap[String, Object]()
+    rsMap.foreach(entry => map.put(entry._1, entry._2))
+    map
+  }
+
+}
diff --git 
a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/operator/clientmanager/FlinkRestClientManager.scala
 
b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/operator/clientmanager/FlinkRestClientManager.scala
new file mode 100644
index 000000000..d7a14beb4
--- /dev/null
+++ 
b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/operator/clientmanager/FlinkRestClientManager.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineconnplugin.flink.operator.clientmanager
+
+import org.apache.linkis.common.utils.Logging
+import org.apache.linkis.engineconnplugin.flink.config.FlinkEnvConfiguration
+import 
org.apache.linkis.engineconnplugin.flink.executor.FlinkManagerConcurrentExecutor
+import 
org.apache.linkis.engineconnplugin.flink.factory.FlinkManagerExecutorFactory
+import org.apache.linkis.engineconnplugin.flink.util.YarnUtil
+import org.apache.linkis.engineconnplugin.flink.util.YarnUtil.logAndException
+
+import org.apache.flink.client.program.rest.RestClusterClient
+import org.apache.flink.configuration.Configuration
+import org.apache.hadoop.yarn.api.records.{ApplicationId, 
FinalApplicationStatus}
+
+import java.util.concurrent.TimeUnit
+
+import com.google.common.cache.{
+  CacheBuilder,
+  CacheLoader,
+  LoadingCache,
+  RemovalListener,
+  RemovalNotification
+}
+
+object FlinkRestClientManager extends Logging {
+
+  private lazy val restclientCache
+      : LoadingCache[String, RestClusterClient[ApplicationId]] = CacheBuilder
+    .newBuilder()
+    .maximumSize(FlinkEnvConfiguration.FLINK_MANAGER_CLIENT_MAX_NUM.getValue)
+    .expireAfterAccess(
+      FlinkEnvConfiguration.FLINK_MANAGER_CLIENT_EXPIRE_MILLS.getValue,
+      TimeUnit.MILLISECONDS
+    )
+    .weakKeys()
+    .removalListener(new RemovalListener[String, 
RestClusterClient[ApplicationId]]() {
+
+      override def onRemoval(
+          notification: RemovalNotification[String, 
RestClusterClient[ApplicationId]]
+      ): Unit = {
+        logger.info(s"RestClusterClient of AppId : ${notification.getKey} was 
removed.")
+      }
+
+    })
+    .build(new CacheLoader[String, RestClusterClient[ApplicationId]]() {
+
+      override def load(appIdStr: String): RestClusterClient[ApplicationId] = {
+
+        val appId: ApplicationId = YarnUtil.retrieveApplicationId(appIdStr)
+
+        val yarnClient = YarnUtil.getYarnClient()
+        val appReport = yarnClient.getApplicationReport(appId)
+
+        if (appReport.getFinalApplicationStatus != 
FinalApplicationStatus.UNDEFINED) {
+          // Flink cluster is not running anymore
+          val msg =
+            s"The application ${appIdStr} doesn't run anymore. It has 
previously completed with final status: 
${appReport.getFinalApplicationStatus.toString}"
+          throw logAndException(msg)
+        }
+
+        val executor = FlinkManagerExecutorFactory.getDefaultExecutor()
+        val tmpFlinkConf: Configuration = executor match {
+          case flinkManagerExecutor: FlinkManagerConcurrentExecutor =>
+            
flinkManagerExecutor.getFlinkContext().getEnvironmentContext.getFlinkConfig.clone()
+          case _ =>
+            val msg = s"Invalid FlinkManagerConcurrentExecutor : ${executor}"
+            throw logAndException(msg)
+        }
+        YarnUtil.setClusterEntrypointInfoToConfig(tmpFlinkConf, appReport)
+        new RestClusterClient[ApplicationId](tmpFlinkConf, 
appReport.getApplicationId)
+      }
+
+    })
+
+  def getFlinkRestClient(appIdStr: String): RestClusterClient[ApplicationId] =
+    restclientCache.get(appIdStr)
+
+  def setFlinkRestClient(appIdStr: String, client: 
RestClusterClient[ApplicationId]): Unit =
+    restclientCache.put(appIdStr, client)
+
+}
diff --git 
a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/EngineNode.java
 
b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/util/ManagerUtil.scala
similarity index 63%
copy from 
linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/EngineNode.java
copy to 
linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/util/ManagerUtil.scala
index e3b8548bf..a4dec1cf8 100644
--- 
a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/EngineNode.java
+++ 
b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/util/ManagerUtil.scala
@@ -15,15 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.linkis.manager.common.entity.node;
+package org.apache.linkis.engineconnplugin.flink.util
 
-public interface EngineNode extends AMNode, RMNode, LabelNode {
+import org.apache.linkis.common.utils.Logging
+import org.apache.linkis.engineconn.launch.EngineConnServer
+import org.apache.linkis.engineconnplugin.flink.config.FlinkEnvConfiguration
 
-  EMNode getEMNode();
+import java.util
 
-  void setEMNode(EMNode emNode);
+object ManagerUtil extends Logging {
 
-  String getLock();
+  val isManager: Boolean = {
+    val options = EngineConnServer.getEngineCreationContext.getOptions
+    FlinkEnvConfiguration.FLINK_MANAGER_MODE_CONFIG_KEY.getValue(options)
+  }
 
-  void setLock(String lock);
 }
diff --git 
a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/util/YarnUtil.scala
 
b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/util/YarnUtil.scala
new file mode 100644
index 000000000..a1c96619e
--- /dev/null
+++ 
b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/util/YarnUtil.scala
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineconnplugin.flink.util
+
+import org.apache.linkis.common.exception.ErrorException
+import org.apache.linkis.common.utils.Logging
+import org.apache.linkis.engineconn.core.executor.ExecutorManager
+import org.apache.linkis.engineconn.executor.entity.YarnExecutor
+import org.apache.linkis.engineconnplugin.flink.config.FlinkEnvConfiguration
+import org.apache.linkis.engineconnplugin.flink.exception.JobExecutionException
+import org.apache.linkis.governance.common.conf.GovernanceCommonConf
+import org.apache.linkis.governance.common.constant.ec.ECConstants
+import org.apache.linkis.manager.common.entity.enumeration.NodeStatus
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.flink
+import org.apache.flink.client.program.rest.RestClusterClient
+import org.apache.flink.configuration.{HighAvailabilityOptions, 
JobManagerOptions, RestOptions}
+import org.apache.flink.runtime.client.JobStatusMessage
+import org.apache.flink.yarn.configuration.YarnConfigOptions
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.yarn.api.records.{
+  ApplicationId,
+  ApplicationReport,
+  FinalApplicationStatus,
+  YarnApplicationState
+}
+import org.apache.hadoop.yarn.client.api.YarnClient
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.util.ConverterUtils
+
+import java.util
+
+import scala.collection.JavaConverters.collectionAsScalaIterableConverter
+import scala.collection.mutable.ArrayBuffer
+
+object YarnUtil extends Logging {
+
+  val CORE_SITE = "core-site.xml"
+  val YARN_SITE = "yarn-site.xml"
+  val HDFS_SITE = "hdfs-site.xml"
+  val MAPRED_SITE = "mapred-site.xml"
+
+  private var yarnClient: YarnClient = _
+
+  def getYarnClient(): YarnClient = {
+    if (null == yarnClient) {
+      YarnUtil.getClass.synchronized {
+        if (null == yarnClient) {
+          yarnClient = createYarnClient()
+        }
+      }
+    }
+    yarnClient
+  }
+
+  private def createYarnClient(): YarnClient = {
+    val yarnClient = YarnClient.createYarnClient()
+    val hadoopConf = getHadoopConf()
+    val yarnConfiguration = new YarnConfiguration(hadoopConf)
+    yarnClient.init(yarnConfiguration)
+    yarnClient.start()
+    yarnClient
+  }
+
+  private def getHadoopConf(): Configuration = {
+    val conf = new Configuration()
+    var confRoot = FlinkEnvConfiguration.HADOOP_CONF_DIR.getValue
+    if (StringUtils.isBlank(confRoot)) {
+      throw new JobExecutionException("HADOOP_CONF_DIR or 
linkis.flink.hadoop.conf.dir not set!")
+    }
+    confRoot = confRoot + "/"
+    conf.addResource(confRoot + HDFS_SITE)
+    conf.addResource(confRoot + CORE_SITE)
+    conf.addResource(confRoot + MAPRED_SITE)
+    conf.addResource(confRoot + YARN_SITE)
+    conf
+  }
+
+  def setClusterEntrypointInfoToConfig(
+      flinkConfiguration: flink.configuration.Configuration,
+      appReport: ApplicationReport
+  ): Unit = {
+    if (null == appReport) {
+      val msg = "Invalid null appReport"
+      logger.error(msg)
+      throw new JobExecutionException(msg)
+    }
+
+    val appId = appReport.getApplicationId
+    val host = appReport.getHost
+    val port = appReport.getRpcPort
+
+    logger.info(s"Found Web Interface ${host}:${port} of application 
'${appId}'.")
+
+    flinkConfiguration.setString(JobManagerOptions.ADDRESS, host)
+    flinkConfiguration.setInteger(JobManagerOptions.PORT, port)
+
+    flinkConfiguration.setString(RestOptions.ADDRESS, host)
+    flinkConfiguration.setInteger(RestOptions.PORT, port)
+
+    flinkConfiguration.set(YarnConfigOptions.APPLICATION_ID, 
ConverterUtils.toString(appId))
+
+    if (!flinkConfiguration.contains(HighAvailabilityOptions.HA_CLUSTER_ID)) {
+      flinkConfiguration.set(HighAvailabilityOptions.HA_CLUSTER_ID, 
ConverterUtils.toString(appId))
+    }
+  }
+
+  def logAndException(msg: String, t: Throwable = null): ErrorException = {
+    logger.error(msg, t)
+    new JobExecutionException(msg)
+  }
+
+  def retrieveApplicationId(appIdStr: String): ApplicationId = {
+    val parts = appIdStr.split("_")
+    val clusterTimestamp = parts(1).toLong
+    val sequenceNumber = parts(2).toInt
+    // Create an ApplicationId object using newInstance method
+    val appId = ApplicationId.newInstance(clusterTimestamp, sequenceNumber)
+    appId
+  }
+
+  def triggerSavepoint(
+      appIdStr: String,
+      checkPointPath: String,
+      restClient: RestClusterClient[ApplicationId]
+  ): String = {
+    val jobs = restClient.listJobs().get()
+    if (null == jobs || jobs.size() > 1) {
+      val size = if (null == jobs) {
+        0
+      } else {
+        jobs.size()
+      }
+      val msg = s"App : ${appIdStr} have ${size} jobs, cannot do snapshot."
+      throw logAndException(msg)
+    }
+    if (StringUtils.isBlank(checkPointPath)) {
+      val msg = s"App : ${appIdStr} checkpoint path is null, cannot do 
checkpoint"
+      throw logAndException(msg)
+    } else {
+      val firstJob = 
jobs.asScala.headOption.getOrElse(null).asInstanceOf[JobStatusMessage]
+      if (null == firstJob) {
+        val msg = s"App : ${appIdStr} got no head job, cannot do checkPoint 
and cancel."
+        throw new JobExecutionException(msg)
+      }
+      val rs = restClient.triggerSavepoint(firstJob.getJobId, 
checkPointPath).get()
+      rs
+    }
+  }
+
+  def convertYarnStateToNodeStatus(appIdStr: String, appStatus: String): 
NodeStatus = {
+    val nodeStatus = appStatus match {
+      case finalState if 
(FinalApplicationStatus.values().map(_.toString).contains(finalState)) =>
+        FinalApplicationStatus.valueOf(finalState) match {
+          case FinalApplicationStatus.KILLED | FinalApplicationStatus.FAILED =>
+            NodeStatus.Failed
+          case FinalApplicationStatus.SUCCEEDED =>
+            NodeStatus.Success
+          case _ =>
+            val msg: String = if (null != appStatus) {
+              s"Application : ${appIdStr} has unknown state : 
${appStatus.toString}"
+            } else {
+              s"Application : ${appIdStr} has null state"
+            }
+            throw new JobExecutionException(msg)
+        }
+      case yarnState if 
(YarnApplicationState.values().map(_.toString).contains(yarnState)) =>
+        YarnApplicationState.valueOf(yarnState) match {
+          case YarnApplicationState.FINISHED =>
+            val msg: String = "Invalid yarn app state : FINISHED"
+            throw new JobExecutionException(msg)
+          case YarnApplicationState.KILLED | YarnApplicationState.FAILED =>
+            NodeStatus.Failed
+          case _ =>
+            NodeStatus.Running
+        }
+      case _ =>
+        val msg: String = if (null != appStatus) {
+          s"Application : ${appIdStr} has unknown state : 
${appStatus.toString}"
+        } else {
+          s"Application : ${appIdStr} has null state"
+        }
+        throw new JobExecutionException(msg)
+    }
+    nodeStatus
+  }
+
+  def isDetach(params: util.Map[String, Any]): Boolean = {
+    val managerOn = params.getOrDefault(
+      FlinkEnvConfiguration.FLINK_MANAGER_MODE_CONFIG_KEY.key,
+      FlinkEnvConfiguration.FLINK_MANAGER_MODE_CONFIG_KEY.getValue
+    )
+    if (null != managerOn && managerOn.toString.toBoolean) {
+      return true
+    }
+    val clientType = params
+      .getOrDefault(
+        GovernanceCommonConf.EC_APP_MANAGE_MODE.key,
+        GovernanceCommonConf.EC_APP_MANAGE_MODE.getValue
+      )
+      .toString
+    logger.info(s"clientType : ${clientType}")
+    clientType.toLowerCase() match {
+      case ECConstants.EC_CLIENT_TYPE_DETACH =>
+        true
+      case _ =>
+        false
+    }
+  }
+
+  def getAppIds: Array[String] = {
+    val ids = new ArrayBuffer[String]
+    ExecutorManager.getInstance.getExecutors.foreach(executor => {
+      executor match {
+        case yarnExecutor: YarnExecutor =>
+          ids.append(yarnExecutor.getApplicationId)
+        case _ =>
+      }
+    })
+    if (ids.size > 1) {
+      logger.error(
+        "There are more than one yarn application running, please check it. 
Ids : " + ids
+          .mkString(",")
+      )
+    }
+    ids.toArray
+  }
+
+}
diff --git 
a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/creation/HiveEngineConnFactory.scala
 
b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/creation/HiveEngineConnFactory.scala
index b637da27e..abae56eb7 100644
--- 
a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/creation/HiveEngineConnFactory.scala
+++ 
b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/creation/HiveEngineConnFactory.scala
@@ -97,7 +97,7 @@ class HiveEngineConnFactory extends 
ComputationSingleExecutorEngineConnFactory w
       engineCreationContext: EngineCreationContext
   ): AbstractHiveSession = {
     // if hive engine support concurrent, return HiveConcurrentSession
-    if (AccessibleExecutorConfiguration.ENGINECONN_SUPPORT_PARALLELISM) {
+    if 
(AccessibleExecutorConfiguration.ENGINECONN_SUPPORT_PARALLELISM.getHotValue()) {
       return doCreateHiveConcurrentSession(engineCreationContext.getOptions)
     }
 
diff --git 
a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/factory/ShellEngineConnFactory.scala
 
b/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/factory/ShellEngineConnFactory.scala
index cbfffc244..85baceef6 100755
--- 
a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/factory/ShellEngineConnFactory.scala
+++ 
b/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/factory/ShellEngineConnFactory.scala
@@ -38,7 +38,7 @@ class ShellEngineConnFactory extends 
ComputationSingleExecutorEngineConnFactory
       engineCreationContext: EngineCreationContext,
       engineConn: EngineConn
   ): LabelExecutor = {
-    if (AccessibleExecutorConfiguration.ENGINECONN_SUPPORT_PARALLELISM) {
+    if 
(AccessibleExecutorConfiguration.ENGINECONN_SUPPORT_PARALLELISM.getValue) {
       new ShellEngineConnConcurrentExecutor(
         id,
         ShellEngineConnConf.SHELL_ENGINECONN_CONCURRENT_LIMIT


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to