This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/linkis.git
The following commit(s) were added to refs/heads/master by this push:
new 9b6d6c0e4 add support for detach manage mode of flink EC (#4796)
9b6d6c0e4 is described below
commit 9b6d6c0e4a33a11b6c6fb44114adbab4e75da2a0
Author: Alexyang <[email protected]>
AuthorDate: Mon Jul 24 10:29:53 2023 +0800
add support for detach manage mode of flink EC (#4796)
---
.../client/once/LinkisManagerClient.scala | 21 +-
.../client/once/action/AskEngineConnAction.scala | 101 +++++++++
.../client/once/result/AskEngineConnResult.scala} | 14 +-
.../once/result/EngineConnOperateResult.scala | 4 +-
.../client/once/simple/SimpleOnceJob.scala | 4 +
.../impl/EngineConnApplicationInfoOperator.scala | 7 +-
.../common/enums/OnceJobOperationBoundary.java} | 18 +-
.../engineconn/EngineConnExecutorErrorCode.java | 2 +
.../common/conf/GovernanceCommonConf.scala | 4 +
.../common/constant/ec/ECConstants.scala | 39 ++++
.../concurrent/monitor/TimingMonitorService.java | 2 +-
.../service/DefaultNodeHeartbeatMsgManager.scala | 21 +-
.../engineconn/core/executor/ExecutorManager.scala | 2 +-
.../impl/EngineConnApplicationInfoOperator.java | 9 +-
.../conf/AccessibleExecutorConfiguration.scala | 4 +-
.../AccessibleExecutorSpringConfiguration.scala | 8 +-
.../acessible/executor/hook/OperationHook.scala | 50 +++++
.../service/DefaultAccessibleService.scala | 2 +-
.../service/DefaultExecutorHeartbeatService.scala | 1 +
.../executor/service/DefaultOperateService.scala | 47 +++-
.../service/EngineConnTimedLockService.scala | 2 +-
.../service/ExecutorHeartbeatService.scala | 11 +
.../linkis/manager/am/exception/AMErrorCode.java | 6 +-
.../manager/am/restful/EngineRestfulApi.java | 219 ++++++++++++++++--
.../engine/DefaultEngineAskEngineService.java | 10 +-
.../manager/am/util/ECResourceInfoUtils.java | 1 +
.../apache/linkis/manager/am/utils/AMUtils.java | 9 +
.../manager/label/constant/LabelKeyConstant.java | 2 +
.../manager/label/entity/engine/ManagerLabel.java | 53 +++++
.../manager/label/entity/engine/RunType.scala | 1 +
.../linkis/manager/common/constant/AMConstant.java | 26 +++
.../manager/common/entity/node/AMEngineNode.java | 12 +
.../manager/common/entity/node/EngineNode.java | 8 +
.../entity/persistence/PersistencerEcNodeInfo.java | 5 +-
.../flink/operator/TriggerSavepointOperator.java | 92 --------
.../flink/config/FlinkEnvConfiguration.scala | 17 ++
.../flink/config/FlinkSrpingConfiguration.scala | 46 ++--
.../flink/constants/FlinkECConstant.scala | 20 +-
.../flink/context/EnvironmentContext.scala | 14 +-
.../flink/executor/FlinkJarOnceExecutor.scala | 87 +++++++-
.../executor/FlinkManagerConcurrentExecutor.scala | 112 ++++++++++
.../flink/executor/FlinkOnceExecutor.scala | 10 +-
.../flink/factory/FlinkEngineConnFactory.scala | 36 ++-
.../factory/FlinkManagerExecutorFactory.scala | 71 ++++++
.../flink/hook/EngineLoadOperationHook.scala | 112 ++++++++++
.../flink/operator/KillOperator.scala | 92 ++++++++
.../flink/operator/ListOperator.scala | 115 ++++++++++
.../flink/operator/StatusOperator.scala | 99 +++++++++
.../flink/operator/TriggerSavepointOperator.scala | 93 ++++++++
.../clientmanager/FlinkRestClientManager.scala | 97 ++++++++
.../engineconnplugin/flink/util/ManagerUtil.scala | 16 +-
.../engineconnplugin/flink/util/YarnUtil.scala | 245 +++++++++++++++++++++
.../hive/creation/HiveEngineConnFactory.scala | 2 +-
.../shell/factory/ShellEngineConnFactory.scala | 2 +-
54 files changed, 1907 insertions(+), 196 deletions(-)
diff --git
a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/LinkisManagerClient.scala
b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/LinkisManagerClient.scala
index 3ac3cb7c8..45f3f49be 100644
---
a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/LinkisManagerClient.scala
+++
b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/LinkisManagerClient.scala
@@ -17,7 +17,9 @@
package org.apache.linkis.computation.client.once
+import org.apache.linkis.common.utils.Utils
import org.apache.linkis.computation.client.once.action.{
+ AskEngineConnAction,
CreateEngineConnAction,
EngineConnOperateAction,
GetEngineConnAction,
@@ -25,6 +27,7 @@ import org.apache.linkis.computation.client.once.action.{
LinkisManagerAction
}
import org.apache.linkis.computation.client.once.result.{
+ AskEngineConnResult,
CreateEngineConnResult,
EngineConnOperateResult,
GetEngineConnResult,
@@ -39,6 +42,8 @@ import java.io.Closeable
trait LinkisManagerClient extends Closeable {
+ def askEngineConn(askEngineConnAction: AskEngineConnAction):
AskEngineConnResult
+
def createEngineConn(createEngineConnAction: CreateEngineConnAction):
CreateEngineConnResult
def getEngineConn(getEngineConnAction: GetEngineConnAction):
GetEngineConnResult
@@ -82,7 +87,21 @@ class LinkisManagerClientImpl(ujesClient: UJESClient)
extends LinkisManagerClien
override def executeEngineConnOperation(
engineConnOperateAction: EngineConnOperateAction
- ): EngineConnOperateResult = execute(engineConnOperateAction)
+ ): EngineConnOperateResult = {
+ Utils.tryCatch {
+ val rs = execute[EngineConnOperateResult](engineConnOperateAction)
+ rs
+ } { case e: Exception =>
+ val rs = new EngineConnOperateResult
+ rs.setIsError(true)
+ rs.setErrorMsg(e.getMessage)
+ rs
+ }
+ }
override def close(): Unit = ujesClient.close()
+
+ override def askEngineConn(askEngineConnAction: AskEngineConnAction):
AskEngineConnResult =
+ execute(askEngineConnAction)
+
}
diff --git
a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/action/AskEngineConnAction.scala
b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/action/AskEngineConnAction.scala
new file mode 100644
index 000000000..4b89b5376
--- /dev/null
+++
b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/action/AskEngineConnAction.scala
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.computation.client.once.action
+
+import org.apache.linkis.httpclient.dws.DWSHttpClient
+import org.apache.linkis.httpclient.request.POSTAction
+import org.apache.linkis.ujes.client.exception.UJESJobException
+
+import org.apache.commons.lang3.StringUtils
+
+import java.util
+
+class AskEngineConnAction extends POSTAction with LinkisManagerAction {
+
+ override def getRequestPayload: String =
+ DWSHttpClient.jacksonJson.writeValueAsString(getRequestPayloads)
+
+ override def suffixURLs: Array[String] = Array("linkisManager",
"askEngineConn")
+
+}
+
+object AskEngineConnAction {
+
+ def newBuilder(): Builder = new Builder
+
+ class Builder private[AskEngineConnAction] () {
+ private var user: String = _
+ private var properties: util.Map[String, String] = _
+ private var labels: util.Map[String, String] = _
+ private var maxSubmitTime: Long = _
+ private var createService: String = _
+ private var description: String = _
+
+ def setUser(user: String): Builder = {
+ this.user = user
+ this
+ }
+
+ def setProperties(properties: util.Map[String, String]): Builder = {
+ this.properties = properties
+ this
+ }
+
+ def setLabels(labels: java.util.Map[String, String]): Builder = {
+ this.labels = labels
+ this
+ }
+
+ def setMaxSubmitTime(maxSubmitTime: Long): Builder = {
+ this.maxSubmitTime = maxSubmitTime
+ this
+ }
+
+ def setCreateService(createService: String): Builder = {
+ this.createService = createService
+ this
+ }
+
+ def setDescription(description: String): Builder = {
+ this.description = description
+ this
+ }
+
+ def build(): AskEngineConnAction = {
+ val action = new AskEngineConnAction()
+ if (user == null) throw new UJESJobException("user is needed!")
+ if (properties == null) properties = new java.util.HashMap[String,
String]
+ if (labels == null) throw new UJESJobException("labels is needed!")
+ action.setUser(user)
+ action.addRequestPayload("properties", properties)
+ action.addRequestPayload("labels", labels)
+ if (StringUtils.isNotBlank(createService)) {
+ action.addRequestPayload("createService", createService)
+ }
+ if (null != maxSubmitTime) {
+ action.addRequestPayload("timeOut", maxSubmitTime)
+ }
+ if (StringUtils.isNotBlank(description)) {
+ action.addRequestPayload("description", description)
+ }
+ action
+ }
+
+ }
+
+}
diff --git
a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/EngineNode.java
b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/result/AskEngineConnResult.scala
similarity index 75%
copy from
linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/EngineNode.java
copy to
linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/result/AskEngineConnResult.scala
index e3b8548bf..58c6085b4 100644
---
a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/EngineNode.java
+++
b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/result/AskEngineConnResult.scala
@@ -15,15 +15,9 @@
* limitations under the License.
*/
-package org.apache.linkis.manager.common.entity.node;
+package org.apache.linkis.computation.client.once.result
-public interface EngineNode extends AMNode, RMNode, LabelNode {
+import org.apache.linkis.httpclient.dws.annotation.DWSHttpMessageResult
- EMNode getEMNode();
-
- void setEMNode(EMNode emNode);
-
- String getLock();
-
- void setLock(String lock);
-}
+@DWSHttpMessageResult("/api/rest_j/v\\d+/linkisManager/askEngineConn")
+class AskEngineConnResult extends GetEngineConnResult
diff --git
a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/result/EngineConnOperateResult.scala
b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/result/EngineConnOperateResult.scala
index 1bf12e041..50df73bd1 100644
---
a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/result/EngineConnOperateResult.scala
+++
b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/result/EngineConnOperateResult.scala
@@ -33,9 +33,11 @@ class EngineConnOperateResult extends LinkisManagerResult {
this.result = result
}
+ def getErrorMsg(): String = errorMsg
+
def setErrorMsg(errorMsg: String): Unit = this.errorMsg = errorMsg
- def setError(isError: Boolean): Unit = this.isError = isError
+ def getIsError(): Boolean = isError
def setIsError(isError: Boolean): Unit = this.isError = isError
diff --git
a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/simple/SimpleOnceJob.scala
b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/simple/SimpleOnceJob.scala
index 492ae76b6..4992b1781 100644
---
a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/simple/SimpleOnceJob.scala
+++
b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/simple/SimpleOnceJob.scala
@@ -95,6 +95,10 @@ trait SimpleOnceJob extends OnceJob {
case operator => operator
}
+ def getEcServiceInstance: ServiceInstance = serviceInstance
+
+ def getEcTicketId: String = ticketId
+
}
class SubmittableSimpleOnceJob(
diff --git
a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/operator/impl/EngineConnApplicationInfoOperator.scala
b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/operator/impl/EngineConnApplicationInfoOperator.scala
index 83399bf37..a1dba6340 100644
---
a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/operator/impl/EngineConnApplicationInfoOperator.scala
+++
b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/operator/impl/EngineConnApplicationInfoOperator.scala
@@ -19,6 +19,7 @@ package org.apache.linkis.computation.client.operator.impl
import org.apache.linkis.computation.client.once.result.EngineConnOperateResult
import org.apache.linkis.computation.client.operator.OnceJobOperator
+import org.apache.linkis.governance.common.constant.ec.ECConstants
import org.apache.linkis.ujes.client.exception.UJESJobException
class EngineConnApplicationInfoOperator extends
OnceJobOperator[ApplicationInfo] {
@@ -28,7 +29,7 @@ class EngineConnApplicationInfoOperator extends
OnceJobOperator[ApplicationInfo]
override protected def resultToObject(result: EngineConnOperateResult):
ApplicationInfo = {
ApplicationInfo(
result
- .getAsOption("applicationId")
+ .getAsOption(ECConstants.YARN_APPID_NAME_KEY)
.getOrElse(
throw new UJESJobException(
20300,
@@ -36,14 +37,14 @@ class EngineConnApplicationInfoOperator extends
OnceJobOperator[ApplicationInfo]
)
),
result
- .getAsOption("applicationUrl")
+ .getAsOption(ECConstants.YARN_APP_URL_KEY)
.getOrElse(
throw new UJESJobException(
20300,
s"Cannot get applicationUrl from EngineConn $getServiceInstance."
)
),
- result.getAs("queue")
+ result.getAs(ECConstants.QUEUE)
)
}
diff --git
a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/EngineNode.java
b/linkis-computation-governance/linkis-computation-governance-common/src/main/java/org/apache/linkis/governance/common/enums/OnceJobOperationBoundary.java
similarity index 75%
copy from
linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/EngineNode.java
copy to
linkis-computation-governance/linkis-computation-governance-common/src/main/java/org/apache/linkis/governance/common/enums/OnceJobOperationBoundary.java
index e3b8548bf..37c6fc8d9 100644
---
a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/EngineNode.java
+++
b/linkis-computation-governance/linkis-computation-governance-common/src/main/java/org/apache/linkis/governance/common/enums/OnceJobOperationBoundary.java
@@ -15,15 +15,19 @@
* limitations under the License.
*/
-package org.apache.linkis.manager.common.entity.node;
+package org.apache.linkis.governance.common.enums;
-public interface EngineNode extends AMNode, RMNode, LabelNode {
+public enum OnceJobOperationBoundary {
+ ECM("ecm"),
+ EC("ec");
- EMNode getEMNode();
+ private String name;
- void setEMNode(EMNode emNode);
+ OnceJobOperationBoundary(String name) {
+ this.name = name;
+ }
- String getLock();
-
- void setLock(String lock);
+ public String getName() {
+ return name;
+ }
}
diff --git
a/linkis-computation-governance/linkis-computation-governance-common/src/main/java/org/apache/linkis/governance/common/exception/engineconn/EngineConnExecutorErrorCode.java
b/linkis-computation-governance/linkis-computation-governance-common/src/main/java/org/apache/linkis/governance/common/exception/engineconn/EngineConnExecutorErrorCode.java
index c0d295755..89d3c9eba 100644
---
a/linkis-computation-governance/linkis-computation-governance-common/src/main/java/org/apache/linkis/governance/common/exception/engineconn/EngineConnExecutorErrorCode.java
+++
b/linkis-computation-governance/linkis-computation-governance-common/src/main/java/org/apache/linkis/governance/common/exception/engineconn/EngineConnExecutorErrorCode.java
@@ -37,4 +37,6 @@ public class EngineConnExecutorErrorCode {
public static final int SEND_TO_ENTRANCE_ERROR = 40105;
public static final int INIT_EXECUTOR_FAILED = 40106;
+
+ public static final int INVALID_APPLICATION_ID = 40107;
}
diff --git
a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/conf/GovernanceCommonConf.scala
b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/conf/GovernanceCommonConf.scala
index fc7d1c890..0029faa91 100644
---
a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/conf/GovernanceCommonConf.scala
+++
b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/conf/GovernanceCommonConf.scala
@@ -86,6 +86,10 @@ object GovernanceCommonConf {
CommonVars(envKey, "").getValue
}
+ // value ECConstants.EC_CLIENT_TYPE_ATTACH
+ val EC_APP_MANAGE_MODE =
+ CommonVars("linkis.ec.app.manage.mode", "attach")
+
val SCALA_PARSE_APPEND_CODE_ENABLED =
CommonVars("linkis.scala.parse.append.code.enable", true).getValue
diff --git
a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/constant/ec/ECConstants.scala
b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/constant/ec/ECConstants.scala
index fe48f6887..a94eadf42 100644
---
a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/constant/ec/ECConstants.scala
+++
b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/constant/ec/ECConstants.scala
@@ -35,4 +35,43 @@ object ECConstants {
val YARN_QUEUE_NAME_CONFIG_KEY = "wds.linkis.rm.yarnqueue"
+ val QUEUE = "queue"
+
+ val EC_CLIENT_TYPE_ATTACH = "attach"
+
+ val EC_CLIENT_TYPE_DETACH = "detach"
+
+ val YARN_APPID_NAME_KEY = "applicationId"
+
+ val YARN_APP_URL_KEY = "applicationUrl"
+
+ val YARN_APP_NAME_KEY = "appicationName"
+
+ val YARN_MODE_KEY = "yarnMode"
+
+ val EC_SERVICE_INSTANCE_KEY = "serviceInstance"
+
+ val ECM_SERVICE_INSTANCE_KEY = "ecmServiceInstance"
+
+ val MANAGER_SERVICE_INSTANCE_KEY = "managerServiceInstance"
+
+ val NODE_STATUS_KEY = "nodeStatus"
+
+ val EC_LAST_UNLOCK_TIMESTAMP = "lastUnlockTimestamp"
+
+ val YARN_APP_TYPE_LIST_KEY = "yarnAppTypeList"
+
+ val YARN_APP_STATE_LIST_KEY = "yarnAppStateList"
+
+ val YARN_APP_TYPE_KEY = "yarnAppType"
+
+ val YARN_APP_TYPE_SPARK = "spark"
+
+ val YARN_APP_TYPE_FLINK = "flink"
+
+ val EC_OPERATE_LIST = "list"
+
+ val EC_OPERATE_STATUS = "status"
+
+ val YARN_APP_RESULT_LIST_KEY = "yarnAppResultList"
}
diff --git
a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/java/org/apache/linkis/engineconn/computation/concurrent/monitor/TimingMonitorService.java
b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/java/org/apache/linkis/engineconn/computation/concurrent/monitor/TimingMonitorService.java
index de6bb440d..a84f58115 100644
---
a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/java/org/apache/linkis/engineconn/computation/concurrent/monitor/TimingMonitorService.java
+++
b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/java/org/apache/linkis/engineconn/computation/concurrent/monitor/TimingMonitorService.java
@@ -55,7 +55,7 @@ public class TimingMonitorService implements
InitializingBean, Runnable {
@Override
public void afterPropertiesSet() throws Exception {
- if (AccessibleExecutorConfiguration.ENGINECONN_SUPPORT_PARALLELISM()) {
+ if ((Boolean)
AccessibleExecutorConfiguration.ENGINECONN_SUPPORT_PARALLELISM().getValue()) {
Utils.defaultScheduler()
.scheduleAtFixedRate(
this, 3 * 60 * 1000, MONITOR_INTERVAL.getValue().toLong(),
TimeUnit.MILLISECONDS);
diff --git
a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/DefaultNodeHeartbeatMsgManager.scala
b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/DefaultNodeHeartbeatMsgManager.scala
index eccf54bfa..010ced97f 100644
---
a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/DefaultNodeHeartbeatMsgManager.scala
+++
b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/DefaultNodeHeartbeatMsgManager.scala
@@ -21,10 +21,13 @@ import org.apache.linkis.common.utils.{Logging, Utils}
import
org.apache.linkis.engineconn.acessible.executor.info.NodeHeartbeatMsgManager
import
org.apache.linkis.engineconn.computation.executor.metrics.ComputationEngineConnMetrics
import org.apache.linkis.engineconn.core.EngineConnObject
-import org.apache.linkis.engineconn.executor.entity.{Executor,
SensibleExecutor}
+import org.apache.linkis.engineconn.executor.entity.{Executor,
SensibleExecutor, YarnExecutor}
import org.apache.linkis.governance.common.constant.ec.ECConstants
+import org.apache.linkis.manager.common.entity.enumeration.NodeStatus
import org.apache.linkis.server.BDPJettyServerHelper
+import org.apache.commons.lang3.StringUtils
+
import org.springframework.stereotype.Component
import java.util
@@ -72,6 +75,22 @@ class DefaultNodeHeartbeatMsgManager extends
NodeHeartbeatMsgManager with Loggin
engineParams.get(ECConstants.YARN_QUEUE_NAME_CONFIG_KEY).asInstanceOf[Object]
)
}
+ executor match {
+ case yarnExecutor: YarnExecutor =>
+ if (StringUtils.isNotBlank(yarnExecutor.getQueue)) {
+ msgMap.put(ECConstants.YARN_QUEUE_NAME_KEY, yarnExecutor.getQueue)
+ }
+ if (StringUtils.isNotBlank(yarnExecutor.getApplicationId)) {
+ msgMap.put(ECConstants.YARN_APPID_NAME_KEY,
yarnExecutor.getApplicationId)
+ }
+ if (StringUtils.isNotBlank(yarnExecutor.getApplicationURL)) {
+ msgMap.put(ECConstants.YARN_APP_URL_KEY,
yarnExecutor.getApplicationURL)
+ }
+ if (StringUtils.isNotBlank(yarnExecutor.getYarnMode)) {
+ msgMap.put(ECConstants.YARN_MODE_KEY, yarnExecutor.getYarnMode)
+ }
+ case _ =>
+ }
Utils.tryCatch(BDPJettyServerHelper.gson.toJson(msgMap)) { case e:
Exception =>
val msgs = msgMap.asScala
.map { case (k, v) => if (null == v) s"${k}->null" else
s"${k}->${v.toString}" }
diff --git
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-core/src/main/scala/org/apache/linkis/engineconn/core/executor/ExecutorManager.scala
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-core/src/main/scala/org/apache/linkis/engineconn/core/executor/ExecutorManager.scala
index 7c180731a..61242beaa 100644
---
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-core/src/main/scala/org/apache/linkis/engineconn/core/executor/ExecutorManager.scala
+++
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-core/src/main/scala/org/apache/linkis/engineconn/core/executor/ExecutorManager.scala
@@ -137,7 +137,7 @@ class LabelExecutorManagerImpl extends LabelExecutorManager
with Logging {
}
protected def getLabelKey(labels: Array[Label[_]]): String =
- labels.map(_.getStringValue).mkString("&")
+ labels.filter(null != _).map(_.getStringValue).mkString("&")
protected def createExecutor(
engineCreationContext: EngineCreationContext,
diff --git
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/java/org/apache/linkis/engineconn/acessible/executor/operator/impl/EngineConnApplicationInfoOperator.java
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/java/org/apache/linkis/engineconn/acessible/executor/operator/impl/EngineConnApplicationInfoOperator.java
index 28a75d3f9..66e1c575f 100644
---
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/java/org/apache/linkis/engineconn/acessible/executor/operator/impl/EngineConnApplicationInfoOperator.java
+++
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/java/org/apache/linkis/engineconn/acessible/executor/operator/impl/EngineConnApplicationInfoOperator.java
@@ -22,6 +22,7 @@ import
org.apache.linkis.engineconn.core.executor.ExecutorManager$;
import org.apache.linkis.engineconn.core.executor.LabelExecutorManager;
import org.apache.linkis.engineconn.executor.entity.Executor;
import org.apache.linkis.engineconn.executor.entity.YarnExecutor;
+import org.apache.linkis.governance.common.constant.ec.ECConstants;
import org.apache.linkis.manager.common.operator.Operator;
import java.util.HashMap;
@@ -43,10 +44,10 @@ public class EngineConnApplicationInfoOperator implements
Operator {
if (reportExecutor instanceof YarnExecutor) {
YarnExecutor yarnExecutor = (YarnExecutor) reportExecutor;
Map<String, Object> result = new HashMap<>();
- result.put("applicationId", yarnExecutor.getApplicationId());
- result.put("applicationUrl", yarnExecutor.getApplicationURL());
- result.put("queue", yarnExecutor.getQueue());
- result.put("yarnMode", yarnExecutor.getYarnMode());
+ result.put(ECConstants.YARN_APPID_NAME_KEY(),
yarnExecutor.getApplicationId());
+ result.put(ECConstants.YARN_APP_URL_KEY(),
yarnExecutor.getApplicationURL());
+ result.put(ECConstants.QUEUE(), yarnExecutor.getQueue());
+ result.put(ECConstants.YARN_MODE_KEY(), yarnExecutor.getYarnMode());
return result;
} else {
throw new EngineConnException(
diff --git
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/conf/AccessibleExecutorConfiguration.scala
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/conf/AccessibleExecutorConfiguration.scala
index 26a25a153..95a01202e 100644
---
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/conf/AccessibleExecutorConfiguration.scala
+++
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/conf/AccessibleExecutorConfiguration.scala
@@ -42,8 +42,8 @@ object AccessibleExecutorConfiguration {
val ENGINECONN_LOCK_CHECK_INTERVAL =
CommonVars("wds.linkis.engineconn.lock.free.interval", new TimeType("3m"))
- val ENGINECONN_SUPPORT_PARALLELISM: Boolean =
- CommonVars("wds.linkis.engineconn.support.parallelism", false).getValue
+ val ENGINECONN_SUPPORT_PARALLELISM =
+ CommonVars("wds.linkis.engineconn.support.parallelism", false)
val ENGINECONN_HEARTBEAT_TIME =
CommonVars("wds.linkis.engineconn.heartbeat.time", new TimeType("2m"))
diff --git
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/conf/AccessibleExecutorSpringConfiguration.scala
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/conf/AccessibleExecutorSpringConfiguration.scala
index 53cdd44b0..93cb41f34 100644
---
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/conf/AccessibleExecutorSpringConfiguration.scala
+++
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/conf/AccessibleExecutorSpringConfiguration.scala
@@ -43,9 +43,13 @@ class AccessibleExecutorSpringConfiguration extends Logging {
def createLockManager(): LockService = {
val lockService =
- if (AccessibleExecutorConfiguration.ENGINECONN_SUPPORT_PARALLELISM) {
+ if
(AccessibleExecutorConfiguration.ENGINECONN_SUPPORT_PARALLELISM.getHotValue()) {
+ logger.info("Engine supports parallelism.")
new EngineConnConcurrentLockService
- } else new EngineConnTimedLockService
+ } else {
+ logger.info("Engine doesn't support parallelism.")
+ new EngineConnTimedLockService
+ }
asyncListenerBusContext.addListener(lockService)
lockService
}
diff --git
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/hook/OperationHook.scala
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/hook/OperationHook.scala
new file mode 100644
index 000000000..12e42c66a
--- /dev/null
+++
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/hook/OperationHook.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineconn.acessible.executor.hook
+
+import org.apache.linkis.manager.common.protocol.engine.{
+ EngineOperateRequest,
+ EngineOperateResponse
+}
+
+import scala.collection.mutable.ArrayBuffer
+
+trait OperationHook {
+ def getName(): String
+
+ def doPreOperation(
+ engineOperateRequest: EngineOperateRequest,
+ engineOperateResponse: EngineOperateResponse
+ ): Unit
+
+ def doPostOperation(
+ engineOperateRequest: EngineOperateRequest,
+ engineOperateResponse: EngineOperateResponse
+ ): Unit
+
+}
+
+object OperationHook {
+ private var operationHooks: ArrayBuffer[OperationHook] = new
ArrayBuffer[OperationHook]()
+
+ def registerOperationHook(operationHook: OperationHook): Unit = {
+ operationHooks.append(operationHook)
+ }
+
+ def getOperationHooks(): Array[OperationHook] = operationHooks.toArray
+}
diff --git
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/DefaultAccessibleService.scala
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/DefaultAccessibleService.scala
index 06fd13b0e..8ef944fc9 100644
---
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/DefaultAccessibleService.scala
+++
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/DefaultAccessibleService.scala
@@ -74,7 +74,7 @@ class DefaultAccessibleService extends AccessibleService with
Logging {
DataWorkCloudApplication.getServiceInstance.equals(engineSuicideRequest.getServiceInstance)
) {
stopEngine()
- logger.info(s"engine will suiside now.")
+ logger.info(s"engine was asked to suiside by
${engineSuicideRequest.getUser} now.")
ShutdownHook.getShutdownHook.notifyStop()
} else {
if (null != engineSuicideRequest.getServiceInstance) {
diff --git
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/DefaultExecutorHeartbeatService.scala
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/DefaultExecutorHeartbeatService.scala
index 067e0d2cb..ea3248ba6 100644
---
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/DefaultExecutorHeartbeatService.scala
+++
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/DefaultExecutorHeartbeatService.scala
@@ -78,6 +78,7 @@ class DefaultExecutorHeartbeatService
heartbeatTime,
TimeUnit.MILLISECONDS
)
+ ExecutorHeartbeatServiceHolder.registerHeartBeatService(this)
}
/**
diff --git
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/DefaultOperateService.scala
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/DefaultOperateService.scala
index 1040df40a..f82c5e944 100644
---
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/DefaultOperateService.scala
+++
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/DefaultOperateService.scala
@@ -18,6 +18,7 @@
package org.apache.linkis.engineconn.acessible.executor.service
import org.apache.linkis.common.utils.{Logging, Utils}
+import org.apache.linkis.engineconn.acessible.executor.hook.OperationHook
import org.apache.linkis.manager.common.operator.OperatorFactory
import org.apache.linkis.manager.common.protocol.engine.{
EngineOperateRequest,
@@ -31,6 +32,8 @@ import org.springframework.stereotype.Service
import java.util
+import scala.collection.JavaConverters.mapAsScalaMapConverter
+
@Service
class DefaultOperateService extends OperateService with Logging {
@@ -38,27 +41,59 @@ class DefaultOperateService extends OperateService with
Logging {
override def executeOperation(
engineOperateRequest: EngineOperateRequest
): EngineOperateResponse = {
- val parameters = engineOperateRequest.getParameters()
+ var response: EngineOperateResponse = null
+
+ val parameters =
+
engineOperateRequest.getParameters.asScala.toMap.asInstanceOf[util.Map[String,
Object]]
val operator =
Utils.tryCatch(OperatorFactory.apply().getOperatorRequest(parameters)) { t =>
logger.error(s"Get operator failed, parameters is
${engineOperateRequest.getParameters}.", t)
- return new EngineOperateResponse(
- new util.HashMap,
+ response = new EngineOperateResponse(
+ new util.HashMap[String, Object](),
true,
ExceptionUtils.getRootCauseMessage(t)
)
+ doPostHook(engineOperateRequest, response)
+ return response
}
logger.info(
s"Try to execute operator ${operator.getClass.getSimpleName} with
parameters ${engineOperateRequest.getParameters}."
)
val result = Utils.tryCatch(operator(parameters)) { t =>
logger.error(s"Execute ${operator.getClass.getSimpleName} failed.", t)
- return new EngineOperateResponse(
- new util.HashMap,
+ response = new EngineOperateResponse(
+ new util.HashMap[String, Object](),
true,
ExceptionUtils.getRootCauseMessage(t)
)
+ doPostHook(engineOperateRequest, response)
+ return response
+ }
+ logger.info(s"End to execute operator ${operator.getClass.getSimpleName}.")
+ response = new EngineOperateResponse(result)
+ doPostHook(engineOperateRequest, response)
+ response
+ }
+
+ private def doPreHook(
+ engineOperateRequest: EngineOperateRequest,
+ engineOperateResponse: EngineOperateResponse
+ ): Unit = {
+ Utils.tryAndWarn {
+ OperationHook
+ .getOperationHooks()
+ .foreach(hook => hook.doPreOperation(engineOperateRequest,
engineOperateResponse))
+ }
+ }
+
+ private def doPostHook(
+ engineOperateRequest: EngineOperateRequest,
+ engineOperateResponse: EngineOperateResponse
+ ): Unit = {
+ Utils.tryAndWarn {
+ OperationHook
+ .getOperationHooks()
+ .foreach(hook => hook.doPostOperation(engineOperateRequest,
engineOperateResponse))
}
- new EngineOperateResponse(result)
}
}
diff --git
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/EngineConnTimedLockService.scala
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/EngineConnTimedLockService.scala
index 21325f42b..b5bbc26f9 100644
---
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/EngineConnTimedLockService.scala
+++
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/EngineConnTimedLockService.scala
@@ -51,7 +51,7 @@ class EngineConnTimedLockService extends LockService with
Logging {
private var lockType: EngineLockType = EngineLockType.Timed
private def isSupportParallelism: Boolean =
- AccessibleExecutorConfiguration.ENGINECONN_SUPPORT_PARALLELISM
+
AccessibleExecutorConfiguration.ENGINECONN_SUPPORT_PARALLELISM.getHotValue()
/**
* @param lock
diff --git
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/ExecutorHeartbeatService.scala
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/ExecutorHeartbeatService.scala
index bfecf7325..7abcbe8dc 100644
---
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/ExecutorHeartbeatService.scala
+++
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/service/ExecutorHeartbeatService.scala
@@ -34,3 +34,14 @@ trait ExecutorHeartbeatService {
def dealNodeHeartbeatRequest(nodeHeartbeatRequest: NodeHeartbeatRequest):
NodeHeartbeatMsg
}
+
+object ExecutorHeartbeatServiceHolder {
+
+ private var executorHeartbeatService: ExecutorHeartbeatService = _
+
+ def registerHeartBeatService(executorHeartbeatService:
ExecutorHeartbeatService): Unit =
+ this.executorHeartbeatService = executorHeartbeatService
+
+ def getDefaultHeartbeatService(): ExecutorHeartbeatService =
executorHeartbeatService
+
+}
diff --git
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/exception/AMErrorCode.java
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/exception/AMErrorCode.java
index 3734e3bdf..c05768739 100644
---
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/exception/AMErrorCode.java
+++
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/exception/AMErrorCode.java
@@ -28,7 +28,11 @@ public enum AMErrorCode implements LinkisErrorCode {
NOT_EXISTS_ENGINE_CONN(210003, "Not exists EngineConn(不存在的引擎)"),
- AM_CONF_ERROR(210004, "AM configuration error(AM配置错误)");
+ AM_CONF_ERROR(210004, "AM configuration error(AM配置错误)"),
+
+ ASK_ENGINE_ERROR_RETRY(210005, "Ask engine error, retry(请求引擎失败,重试)"),
+
+ EC_OPERATE_ERROR(210006, "Failed to execute operation(引擎操作失败)");
private final int errorCode;
diff --git
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/restful/EngineRestfulApi.java
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/restful/EngineRestfulApi.java
index 3d6e0bc39..14cad1380 100644
---
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/restful/EngineRestfulApi.java
+++
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/restful/EngineRestfulApi.java
@@ -23,28 +23,28 @@ import
org.apache.linkis.common.exception.LinkisRetryException;
import org.apache.linkis.common.utils.ByteTimeUtils;
import org.apache.linkis.common.utils.JsonUtils;
import org.apache.linkis.governance.common.conf.GovernanceCommonConf;
+import org.apache.linkis.governance.common.constant.ec.ECConstants;
+import org.apache.linkis.governance.common.utils.JobUtils;
+import org.apache.linkis.governance.common.utils.LoggerUtils;
import org.apache.linkis.manager.am.conf.AMConfiguration;
import org.apache.linkis.manager.am.exception.AMErrorCode;
import org.apache.linkis.manager.am.exception.AMErrorException;
import org.apache.linkis.manager.am.manager.EngineNodeManager;
import org.apache.linkis.manager.am.service.ECResourceInfoService;
-import org.apache.linkis.manager.am.service.engine.EngineCreateService;
-import org.apache.linkis.manager.am.service.engine.EngineInfoService;
-import org.apache.linkis.manager.am.service.engine.EngineOperateService;
-import org.apache.linkis.manager.am.service.engine.EngineStopService;
+import org.apache.linkis.manager.am.service.engine.*;
import org.apache.linkis.manager.am.util.ECResourceInfoUtils;
import org.apache.linkis.manager.am.utils.AMUtils;
import org.apache.linkis.manager.am.vo.AMEngineNodeVo;
+import org.apache.linkis.manager.common.constant.AMConstant;
import org.apache.linkis.manager.common.entity.enumeration.NodeStatus;
import org.apache.linkis.manager.common.entity.node.AMEMNode;
+import org.apache.linkis.manager.common.entity.node.EMNode;
import org.apache.linkis.manager.common.entity.node.EngineNode;
import
org.apache.linkis.manager.common.entity.persistence.ECResourceInfoRecord;
-import org.apache.linkis.manager.common.protocol.engine.EngineCreateRequest;
-import org.apache.linkis.manager.common.protocol.engine.EngineOperateRequest;
-import org.apache.linkis.manager.common.protocol.engine.EngineOperateResponse;
-import org.apache.linkis.manager.common.protocol.engine.EngineStopRequest;
+import org.apache.linkis.manager.common.protocol.engine.*;
import org.apache.linkis.manager.label.builder.factory.LabelBuilderFactory;
import
org.apache.linkis.manager.label.builder.factory.LabelBuilderFactoryContext;
+import org.apache.linkis.manager.label.constant.LabelKeyConstant;
import org.apache.linkis.manager.label.entity.Label;
import org.apache.linkis.manager.label.entity.UserModifiable;
import org.apache.linkis.manager.label.exception.LabelErrorException;
@@ -65,6 +65,7 @@ import javax.servlet.http.HttpServletRequest;
import java.io.IOException;
import java.text.MessageFormat;
import java.util.*;
+import java.util.concurrent.Callable;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -103,6 +104,8 @@ public class EngineRestfulApi {
@Autowired private ECResourceInfoService ecResourceInfoService;
+ @Autowired private EngineReuseService engineReuseService;
+
private final ObjectMapper objectMapper = new ObjectMapper();
private LabelBuilderFactory stdLabelBuilderFactory =
@@ -110,6 +113,183 @@ public class EngineRestfulApi {
private static final Logger logger =
LoggerFactory.getLogger(EngineRestfulApi.class);
+ @ApiOperation(value = "askEngineConn", response = Message.class)
+ @ApiOperationSupport(ignoreParameters = {"jsonNode"})
+ @RequestMapping(path = "/askEngineConn", method = RequestMethod.POST)
+ public Message askEngineConn(
+ HttpServletRequest req, @RequestBody EngineAskRequest engineAskRequest)
+ throws IOException, InterruptedException {
+ String userName = ModuleUserUtils.getOperationUser(req, "askEngineConn");
+ engineAskRequest.setUser(userName);
+ long timeout = engineAskRequest.getTimeOut();
+ if (timeout <= 0) {
+ timeout =
AMConfiguration.ENGINE_CONN_START_REST_MAX_WAIT_TIME.getValue().toLong();
+ engineAskRequest.setTimeOut(timeout);
+ }
+ Map<String, Object> retEngineNode = new HashMap<>();
+ logger.info(
+ "User {} try to ask an engineConn with maxStartTime {}.
EngineAskRequest is {}.",
+ userName,
+ ByteTimeUtils.msDurationToString(timeout),
+ engineAskRequest);
+ Sender sender = Sender.getSender(Sender.getThisServiceInstance());
+ EngineNode engineNode = null;
+
+ // try to reuse ec first
+ String taskId =
JobUtils.getJobIdFromStringMap(engineAskRequest.getProperties());
+ LoggerUtils.setJobIdMDC(taskId);
+ logger.info("received task : {}, engineAskRequest : {}", taskId,
engineAskRequest);
+ if
(!engineAskRequest.getLabels().containsKey(LabelKeyConstant.EXECUTE_ONCE_KEY)) {
+ EngineReuseRequest engineReuseRequest = new EngineReuseRequest();
+ engineReuseRequest.setLabels(engineAskRequest.getLabels());
+ engineReuseRequest.setTimeOut(engineAskRequest.getTimeOut());
+ engineReuseRequest.setUser(engineAskRequest.getUser());
+ engineReuseRequest.setProperties(engineAskRequest.getProperties());
+ boolean end = false;
+ EngineNode reuseNode = null;
+ int count = 0;
+ int MAX_RETRY = 2;
+ while (!end) {
+ try {
+ reuseNode = engineReuseService.reuseEngine(engineReuseRequest,
sender);
+ end = true;
+ } catch (LinkisRetryException e) {
+ logger.error(
+ "task: {}, user: {} reuse engine failed", taskId,
engineReuseRequest.getUser(), e);
+ Thread.sleep(1000);
+ end = false;
+ count += 1;
+ if (count > MAX_RETRY) {
+ end = true;
+ }
+ } catch (Exception e1) {
+ logger.info(
+ "task: {} user: {} reuse engine failed", taskId,
engineReuseRequest.getUser(), e1);
+ end = true;
+ }
+ }
+ if (null != reuseNode) {
+ logger.info(
+ "Finished to ask engine for task: {}, user: {} by reuse node {}",
+ taskId,
+ engineReuseRequest.getUser(),
+ reuseNode);
+ LoggerUtils.removeJobIdMDC();
+ engineNode = reuseNode;
+ }
+ }
+
+ if (null != engineNode) {
+ fillResultEngineNode(retEngineNode, engineNode);
+ return Message.ok("reuse engineConn ended.").data("engine",
retEngineNode);
+ }
+
+ String engineAskAsyncId = AMUtils.getAsyncId();
+ Callable<Object> createECTask =
+ new Callable() {
+ @Override
+ public Object call() {
+ LoggerUtils.setJobIdMDC(taskId);
+ logger.info(
+ "Task: {}, start to async({}) createEngine: {}",
+ taskId,
+ engineAskAsyncId,
+ engineAskRequest.getCreateService());
+ // 如果原来的labels含engineInstance ,先去掉
+ engineAskRequest.getLabels().remove("engineInstance");
+ EngineCreateRequest engineCreateRequest = new
EngineCreateRequest();
+ engineCreateRequest.setLabels(engineAskRequest.getLabels());
+ engineCreateRequest.setTimeout(engineAskRequest.getTimeOut());
+ engineCreateRequest.setUser(engineAskRequest.getUser());
+
engineCreateRequest.setProperties(engineAskRequest.getProperties());
+
engineCreateRequest.setCreateService(engineAskRequest.getCreateService());
+ try {
+ EngineNode createNode =
engineCreateService.createEngine(engineCreateRequest, sender);
+ long timeout = 0L;
+ if (engineCreateRequest.getTimeout() <= 0) {
+ timeout =
AMConfiguration.ENGINE_START_MAX_TIME.getValue().toLong();
+ } else {
+ timeout = engineCreateRequest.getTimeout();
+ }
+ // useEngine 需要加上超时
+ EngineNode createEngineNode =
engineNodeManager.useEngine(createNode, timeout);
+ if (null == createEngineNode) {
+ throw new LinkisRetryException(
+ AMConstant.EM_ERROR_CODE,
+ "create engine${createNode.getServiceInstance} success,
but to use engine failed");
+ }
+ logger.info(
+ "Task: $taskId finished to ask engine for user
${engineAskRequest.getUser} by create node $createEngineNode");
+ return createEngineNode;
+ } catch (Exception e) {
+ logger.error(
+ "Task: {} failed to ask engine for user {} by create node",
taskId, userName, e);
+ return new LinkisRetryException(AMConstant.EM_ERROR_CODE,
e.getMessage());
+ } finally {
+ LoggerUtils.removeJobIdMDC();
+ }
+ }
+ };
+
+ try {
+ Object rs = createECTask.call();
+ if (rs instanceof LinkisRetryException) {
+ throw (LinkisRetryException) rs;
+ } else {
+ engineNode = (EngineNode) rs;
+ }
+ } catch (LinkisRetryException retryException) {
+ logger.error(
+ "User {} create engineConn failed get retry exception. can be
Retry",
+ userName,
+ retryException);
+ return Message.error(
+ String.format(
+ "Create engineConn failed, caused by %s.",
+ ExceptionUtils.getRootCauseMessage(retryException)))
+ .data("canRetry", true);
+ } catch (Exception e) {
+ LoggerUtils.removeJobIdMDC();
+ logger.error("User {} create engineConn failed get retry exception",
userName, e);
+ return Message.error(
+ String.format(
+ "Create engineConn failed, caused by %s.",
ExceptionUtils.getRootCauseMessage(e)));
+ }
+
+ LoggerUtils.removeJobIdMDC();
+ fillResultEngineNode(retEngineNode, engineNode);
+ logger.info(
+ "Finished to create a engineConn for user {}. NodeInfo is {}.",
userName, engineNode);
+ // to transform to a map
+ return Message.ok("create engineConn ended.").data("engine",
retEngineNode);
+ }
+
+ private void fillNullNode(
+ Map<String, Object> retEngineNode, EngineAskAsyncResponse
askAsyncResponse) {
+ retEngineNode.put(AMConstant.EC_ASYNC_START_RESULT_KEY,
AMConstant.EC_ASYNC_START_RESULT_FAIL);
+ retEngineNode.put(
+ AMConstant.EC_ASYNC_START_FAIL_MSG_KEY,
+ "Got null response for asyId : " + askAsyncResponse.getId());
+ retEngineNode.put(ECConstants.MANAGER_SERVICE_INSTANCE_KEY(),
Sender.getThisServiceInstance());
+ }
+
+ private void fillResultEngineNode(Map<String, Object> retEngineNode,
EngineNode engineNode) {
+ retEngineNode.put(
+ AMConstant.EC_ASYNC_START_RESULT_KEY,
AMConstant.EC_ASYNC_START_RESULT_SUCCESS);
+ retEngineNode.put("serviceInstance", engineNode.getServiceInstance());
+ if (null == engineNode.getNodeStatus()) {
+ engineNode.setNodeStatus(NodeStatus.Starting);
+ }
+ retEngineNode.put(ECConstants.NODE_STATUS_KEY(),
engineNode.getNodeStatus().toString());
+ retEngineNode.put(ECConstants.EC_TICKET_ID_KEY(),
engineNode.getTicketId());
+ EMNode emNode = engineNode.getEMNode();
+ if (null != emNode) {
+ retEngineNode.put(
+ ECConstants.ECM_SERVICE_INSTANCE_KEY(),
engineNode.getEMNode().getServiceInstance());
+ }
+ retEngineNode.put(ECConstants.MANAGER_SERVICE_INSTANCE_KEY(),
Sender.getThisServiceInstance());
+ }
+
@ApiOperation(value = "createEngineConn", response = Message.class)
@ApiOperationSupport(ignoreParameters = {"jsonNode"})
@RequestMapping(path = "/createEngineConn", method = RequestMethod.POST)
@@ -149,13 +329,7 @@ public class EngineRestfulApi {
"Finished to create a engineConn for user {}. NodeInfo is {}.",
userName, engineNode);
// to transform to a map
Map<String, Object> retEngineNode = new HashMap<>();
- retEngineNode.put("serviceInstance", engineNode.getServiceInstance());
- if (null == engineNode.getNodeStatus()) {
- engineNode.setNodeStatus(NodeStatus.Starting);
- }
- retEngineNode.put("nodeStatus", engineNode.getNodeStatus().toString());
- retEngineNode.put("ticketId", engineNode.getTicketId());
- retEngineNode.put("ecmServiceInstance",
engineNode.getEMNode().getServiceInstance());
+ fillResultEngineNode(retEngineNode, engineNode);
return Message.ok("create engineConn succeed.").data("engine",
retEngineNode);
}
@@ -173,6 +347,7 @@ public class EngineRestfulApi {
} catch (Exception e) {
logger.info("Instances {} does not exist",
serviceInstance.getInstance());
}
+ String ecMetrics = null;
if (null == engineNode) {
ECResourceInfoRecord ecInfo = null;
if (null != ticketIdNode) {
@@ -189,12 +364,19 @@ public class EngineRestfulApi {
if (null == ecInfo) {
return Message.error("Instance does not exist " + serviceInstance);
}
+ if (null == ecMetrics) {
+ ecMetrics = ecInfo.getMetrics();
+ }
engineNode = ECResourceInfoUtils.convertECInfoTOECNode(ecInfo);
+ } else {
+ ecMetrics = engineNode.getEcMetrics();
}
if (!userName.equals(engineNode.getOwner()) &&
Configuration.isNotAdmin(userName)) {
return Message.error("You have no permission to access EngineConn " +
serviceInstance);
}
- return Message.ok().data("engine", engineNode);
+ Message result = Message.ok().data("engine", engineNode);
+ result.data(AMConstant.EC_METRICS_KEY, ecMetrics);
+ return result;
}
@ApiOperation(value = "kill egineconn", notes = "kill engineconn", response
= Message.class)
@@ -487,6 +669,11 @@ public class EngineRestfulApi {
ServiceInstance serviceInstance = getServiceInstance(jsonNode);
logger.info("User {} try to execute Engine Operation {}.", userName,
serviceInstance);
EngineNode engineNode = engineNodeManager.getEngineNode(serviceInstance);
+ if (null == engineNode) {
+ return Message.ok()
+ .data("isError", true)
+ .data("errorMsg", "Ec : " + serviceInstance.toString() + " not
found.");
+ }
if (!userName.equals(engineNode.getOwner()) &&
Configuration.isNotAdmin(userName)) {
return Message.error("You have no permission to execute Engine Operation
" + serviceInstance);
}
diff --git
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/engine/DefaultEngineAskEngineService.java
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/engine/DefaultEngineAskEngineService.java
index 36675ff84..ab5799063 100644
---
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/engine/DefaultEngineAskEngineService.java
+++
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/engine/DefaultEngineAskEngineService.java
@@ -22,6 +22,7 @@ import org.apache.linkis.governance.common.utils.JobUtils;
import org.apache.linkis.governance.common.utils.LoggerUtils;
import org.apache.linkis.manager.am.conf.AMConfiguration;
import org.apache.linkis.manager.am.util.LinkisUtils;
+import org.apache.linkis.manager.am.utils.AMUtils;
import org.apache.linkis.manager.common.constant.AMConstant;
import org.apache.linkis.manager.common.entity.node.EngineNode;
import org.apache.linkis.manager.common.protocol.engine.*;
@@ -36,7 +37,6 @@ import org.springframework.stereotype.Service;
import java.net.SocketTimeoutException;
import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicInteger;
import feign.RetryableException;
import org.slf4j.Logger;
@@ -50,8 +50,6 @@ public class DefaultEngineAskEngineService extends
AbstractEngineService
private EngineCreateService engineCreateService;
private EngineReuseService engineReuseService;
- private AtomicInteger idCreator = new AtomicInteger();
- private String idPrefix = Sender.getThisServiceInstance().getInstance();
private static final ThreadPoolExecutor EXECUTOR =
LinkisUtils.newCachedThreadPool(
@@ -103,7 +101,7 @@ public class DefaultEngineAskEngineService extends
AbstractEngineService
}
}
- String engineAskAsyncId = getAsyncId();
+ String engineAskAsyncId = AMUtils.getAsyncId();
CompletableFuture<EngineNode> createNodeThread =
CompletableFuture.supplyAsync(
() -> {
@@ -197,8 +195,4 @@ public class DefaultEngineAskEngineService extends
AbstractEngineService
LoggerUtils.removeJobIdMDC();
return new EngineAskAsyncResponse(engineAskAsyncId,
Sender.getThisServiceInstance());
}
-
- private String getAsyncId() {
- return idPrefix + "_" + idCreator.getAndIncrement();
- }
}
diff --git
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/util/ECResourceInfoUtils.java
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/util/ECResourceInfoUtils.java
index 5fc852966..85c7470ce 100644
---
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/util/ECResourceInfoUtils.java
+++
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/util/ECResourceInfoUtils.java
@@ -125,6 +125,7 @@ public class ECResourceInfoUtils {
engineNode.setTicketId(ecInfo.getTicketId());
engineNode.setStartTime(ecInfo.getCreateTime());
engineNode.setUpdateTime(ecInfo.getReleaseTime());
+ engineNode.setEcMetrics(ecInfo.getMetrics());
return engineNode;
}
diff --git
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/utils/AMUtils.java
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/utils/AMUtils.java
index 43144e53f..660d39323 100644
---
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/utils/AMUtils.java
+++
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/utils/AMUtils.java
@@ -25,11 +25,13 @@ import
org.apache.linkis.manager.common.entity.node.EngineNode;
import org.apache.linkis.manager.common.entity.resource.*;
import org.apache.linkis.manager.label.entity.Label;
import org.apache.linkis.manager.label.entity.engine.EngineTypeLabel;
+import org.apache.linkis.rpc.Sender;
import java.lang.reflect.Type;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
@@ -41,6 +43,9 @@ import org.slf4j.LoggerFactory;
public class AMUtils {
private static final Logger logger = LoggerFactory.getLogger(AMUtils.class);
+ private static final AtomicInteger idCreator = new AtomicInteger();
+ private static String idPrefix =
Sender.getThisServiceInstance().getInstance();
+
private static Gson GSON =
new GsonBuilder()
.setDateFormat("yyyy-MM-dd'T'HH:mm:ssZ")
@@ -335,4 +340,8 @@ public class AMUtils {
return false;
}
}
+
+ public static String getAsyncId() {
+ return idPrefix + "_" + idCreator.getAndIncrement();
+ }
}
diff --git
a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/constant/LabelKeyConstant.java
b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/constant/LabelKeyConstant.java
index bcb14a704..362932083 100644
---
a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/constant/LabelKeyConstant.java
+++
b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/constant/LabelKeyConstant.java
@@ -63,4 +63,6 @@ public class LabelKeyConstant {
public static final String TENANT_KEY = "tenant";
public static final String FIXED_EC_KEY = "fixedEngineConn";
+
+ public static final String MANAGER_KEY = "manager";
}
diff --git
a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/ManagerLabel.java
b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/ManagerLabel.java
new file mode 100644
index 000000000..674cc605a
--- /dev/null
+++
b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/ManagerLabel.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.manager.label.entity.engine;
+
+import org.apache.linkis.manager.label.constant.LabelKeyConstant;
+import org.apache.linkis.manager.label.entity.EngineNodeLabel;
+import org.apache.linkis.manager.label.entity.Feature;
+import org.apache.linkis.manager.label.entity.GenericLabel;
+import org.apache.linkis.manager.label.entity.annon.ValueSerialNum;
+
+import java.util.HashMap;
+
+public class ManagerLabel extends GenericLabel implements EngineNodeLabel {
+
+ public ManagerLabel() {
+ setLabelKey(LabelKeyConstant.MANAGER_KEY);
+ }
+
+ @Override
+ public Feature getFeature() {
+ return Feature.CORE;
+ }
+
+ public String getManager() {
+ if (null == getValue()) {
+ return null;
+ }
+ return getValue().get(getLabelKey());
+ }
+
+ @ValueSerialNum(0)
+ public void setManager(String manager) {
+ if (null == getValue()) {
+ setValue(new HashMap<>());
+ }
+ getValue().put(getLabelKey(), manager);
+ }
+}
diff --git
a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/RunType.scala
b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/RunType.scala
index a24445269..21a067ed4 100644
---
a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/RunType.scala
+++
b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/RunType.scala
@@ -42,6 +42,7 @@ object RunType extends Enumeration {
val ES_JSON = Value("esjson")
val TRINO_SQL = Value("tsql")
+ val JSON = Value("json")
val SEATUNNEL_ZETA = Value("szeta")
diff --git
a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/constant/AMConstant.java
b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/constant/AMConstant.java
index 081e5e605..09d802a95 100644
---
a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/constant/AMConstant.java
+++
b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/constant/AMConstant.java
@@ -34,4 +34,30 @@ public class AMConstant {
public static final String START_REASON = "start_reason";
public static final String EC_CAN_RETRY = "ec_can_try";
+
+ public static final String EC_ASYNC_START_ID_KEY = "ecAsyncStartId";
+
+ public static final String EC_ASYNC_START_MANAGER_INSTANCE_KEY =
"managerInstance";
+
+ /*
+ result : starting,success,failed
+ */
+ public static final String EC_ASYNC_START_RESULT_KEY = "ecAsyncStartResult";
+
+ /*
+ default false
+ */
+ public static final String EC_SYNC_START_KEY = "ecSyncStart";
+
+ public static final String EC_ASYNC_START_RESULT_SUCCESS = "success";
+
+ public static final String EC_ASYNC_START_RESULT_FAIL = "failed";
+
+ public static final String EC_ASYNC_START_RESULT_STARTING = "starting";
+
+ public static final String EC_ASYNC_START_FAIL_RETRY_KEY = "canRetry";
+
+ public static final String EC_ASYNC_START_FAIL_MSG_KEY = "failMsg";
+
+ public static final String EC_METRICS_KEY = "ecMetrics";
}
diff --git
a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/AMEngineNode.java
b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/AMEngineNode.java
index 7343aab2d..a560eec33 100644
---
a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/AMEngineNode.java
+++
b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/AMEngineNode.java
@@ -62,6 +62,8 @@ public class AMEngineNode implements EngineNode,
ScoreServiceInstance {
private String ticketId;
+ private String ecMetrics;
+
public AMEngineNode() {}
public AMEngineNode(double score, ServiceInstance serviceInstance) {
@@ -210,6 +212,16 @@ public class AMEngineNode implements EngineNode,
ScoreServiceInstance {
this.ticketId = ticketId;
}
+ @Override
+ public String getEcMetrics() {
+ return ecMetrics;
+ }
+
+ @Override
+ public void setEcMetrics(String metrics) {
+ this.ecMetrics = metrics;
+ }
+
@Override
public Date getUpdateTime() {
return updateTime;
diff --git
a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/EngineNode.java
b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/EngineNode.java
index e3b8548bf..627b41bc5 100644
---
a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/EngineNode.java
+++
b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/EngineNode.java
@@ -26,4 +26,12 @@ public interface EngineNode extends AMNode, RMNode,
LabelNode {
String getLock();
void setLock(String lock);
+
+ String getTicketId();
+
+ void setTicketId(String ticketId);
+
+ String getEcMetrics();
+
+ void setEcMetrics(String metrics);
}
diff --git
a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/persistence/PersistencerEcNodeInfo.java
b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/persistence/PersistencerEcNodeInfo.java
index 8637887e5..a3e02f021 100644
---
a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/persistence/PersistencerEcNodeInfo.java
+++
b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/persistence/PersistencerEcNodeInfo.java
@@ -58,7 +58,10 @@ public class PersistencerEcNodeInfo extends PersistenceNode {
+ ", engineType='"
+ engineType
+ '\''
- + "} "
+ + ", heartbeatMsg='"
+ + heartbeatMsg
+ + '\''
+ + '}'
+ super.toString();
}
}
diff --git
a/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/operator/TriggerSavepointOperator.java
b/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/operator/TriggerSavepointOperator.java
deleted file mode 100644
index ddf4838cd..000000000
---
a/linkis-engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/operator/TriggerSavepointOperator.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.linkis.engineconnplugin.flink.operator;
-
-import org.apache.linkis.common.exception.WarnException;
-import
org.apache.linkis.engineconn.once.executor.creation.OnceExecutorManager$;
-import
org.apache.linkis.engineconnplugin.flink.client.deployment.ClusterDescriptorAdapter;
-import
org.apache.linkis.engineconnplugin.flink.errorcode.FlinkErrorCodeSummary;
-import
org.apache.linkis.engineconnplugin.flink.exception.JobExecutionException;
-import org.apache.linkis.engineconnplugin.flink.executor.FlinkOnceExecutor;
-import org.apache.linkis.manager.common.operator.Operator;
-
-import java.text.MessageFormat;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TriggerSavepointOperator implements Operator {
-
- private static final Logger logger =
LoggerFactory.getLogger(TriggerSavepointOperator.class);
-
- @Override
- public String[] getNames() {
- return new String[] {"doSavepoint"};
- }
-
- @Override
- public Map<String, Object> apply(Map<String, Object> parameters) {
- String savepoint = getAsThrow(parameters, "savepointPath");
- String mode = getAsThrow(parameters, "mode");
- logger.info("try to " + mode + " savepoint with path " + savepoint + ".");
-
- if (OnceExecutorManager$.MODULE$.getInstance().getReportExecutor()
- instanceof FlinkOnceExecutor) {
- FlinkOnceExecutor flinkExecutor =
- (FlinkOnceExecutor)
OnceExecutorManager$.MODULE$.getInstance().getReportExecutor();
- ClusterDescriptorAdapter clusterDescriptorAdapter =
- (ClusterDescriptorAdapter)
flinkExecutor.getClusterDescriptorAdapter();
- String writtenSavepoint = "";
- try {
- writtenSavepoint = clusterDescriptorAdapter.doSavepoint(savepoint,
mode);
- } catch (JobExecutionException e) {
- logger.info("doSavepoint failed", e);
- throw new RuntimeException(e);
- }
-
- Map<String, Object> stringMap = new HashMap<>();
- stringMap.put("writtenSavepoint", writtenSavepoint);
- return stringMap;
- } else {
- throw new WarnException(
- FlinkErrorCodeSummary.NOT_SUPPORT_SAVEPOTION.getErrorCode(),
- MessageFormat.format(
- FlinkErrorCodeSummary.NOT_SUPPORT_SAVEPOTION.getErrorDesc(),
- OnceExecutorManager$.MODULE$
- .getInstance()
- .getReportExecutor()
- .getClass()
- .getSimpleName()));
- }
- }
-
- public <T> T getAsThrow(Map<String, Object> parameters, String key) {
- Object value = parameters.get(key);
- if (value != null) {
- try {
- return (T) value;
- } catch (Exception e) {
- throw new IllegalArgumentException("parameter " + key + " is
invalid.", e);
- }
- } else {
- throw new IllegalArgumentException("parameter " + key + " is required.");
- }
- }
-}
diff --git
a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala
b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala
index 5f93b690e..8115a128c 100644
---
a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala
+++
b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala
@@ -132,6 +132,9 @@ object FlinkEnvConfiguration {
val FLINK_ONCE_APP_STATUS_FETCH_INTERVAL =
CommonVars("flink.app.fetch.status.interval", new TimeType("5s"))
+ val FLINK_ONCE_JAR_APP_REPORT_APPLICATIONID_INTERVAL =
+ CommonVars("flink.app.report.appid.interval", new TimeType("60s"))
+
val FLINK_ONCE_APP_STATUS_FETCH_FAILED_MAX =
CommonVars("flink.app.fetch.status.failed.num", 3)
val FLINK_REPORTER_ENABLE = CommonVars("linkis.flink.reporter.enable", false)
@@ -153,4 +156,18 @@ object FlinkEnvConfiguration {
val FLINK_PARAMS_BLANK_PLACEHOLER =
CommonVars("linkis.flink.params.placeholder.blank", "\u0001")
+ val FLINK_MANAGER_MODE_CONFIG_KEY =
CommonVars("linkis.flink.manager.mode.on", false)
+
+ val FLINK_MANAGER_LOAD_TASK_MAX =
CommonVars("linkis.flink.manager.load.task.max", 50)
+
+ val HADOOP_CONF_DIR = CommonVars("linkis.flink.hadoop.conf.dir",
System.getenv("HADOOP_CONF_DIR"))
+
+ val FLINK_MANAGER_CLIENT_MAX_NUM = CommonVars("linkis.flink.client.num.max",
200)
+
+ val FLINK_MANAGER_CLIENT_EXPIRE_MILLS =
+ CommonVars("linkis.flink.client.expire.mills", 3600 * 1000)
+
+ val FLINK_HANDSHAKE_WAIT_TIME_MILLS =
+ CommonVars("linkis.flink.handshake.wait.time.mills", 60 * 1000)
+
}
diff --git
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/conf/AccessibleExecutorSpringConfiguration.scala
b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkSrpingConfiguration.scala
similarity index 56%
copy from
linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/conf/AccessibleExecutorSpringConfiguration.scala
copy to
linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkSrpingConfiguration.scala
index 53cdd44b0..e0519e548 100644
---
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/acessible/executor/conf/AccessibleExecutorSpringConfiguration.scala
+++
b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkSrpingConfiguration.scala
@@ -15,25 +15,22 @@
* limitations under the License.
*/
-package org.apache.linkis.engineconn.acessible.executor.conf
+package org.apache.linkis.engineconnplugin.flink.config
-import org.apache.linkis.common.utils.Logging
-import org.apache.linkis.engineconn.acessible.executor.info.{
- DefaultNodeOverLoadInfoManager,
- NodeOverLoadInfoManager
-}
+import org.apache.linkis.common.utils.{Logging, Utils}
import org.apache.linkis.engineconn.acessible.executor.service.{
EngineConnConcurrentLockService,
EngineConnTimedLockService,
LockService
}
import
org.apache.linkis.engineconn.executor.listener.ExecutorListenerBusContext
+import org.apache.linkis.engineconnplugin.flink.util.ManagerUtil
import
org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean
import org.springframework.context.annotation.{Bean, Configuration}
@Configuration
-class AccessibleExecutorSpringConfiguration extends Logging {
+class FlinkSrpingConfiguration extends Logging {
private val asyncListenerBusContext =
ExecutorListenerBusContext.getExecutorListenerBusContext().getEngineConnAsyncListenerBus
@@ -43,18 +40,41 @@ class AccessibleExecutorSpringConfiguration extends Logging
{
def createLockManager(): LockService = {
val lockService =
- if (AccessibleExecutorConfiguration.ENGINECONN_SUPPORT_PARALLELISM) {
+ if (ManagerUtil.isManager) {
+ logger.info("Engine is manager, supports parallelism.")
new EngineConnConcurrentLockService
- } else new EngineConnTimedLockService
+ } else {
+ logger.info("Engine is not manager, doesn't support parallelism.")
+ new EngineConnTimedLockService
+ }
asyncListenerBusContext.addListener(lockService)
+ FlinkLockerServiceHolder.registerLockService(lockService)
lockService
}
- @Bean
- @ConditionalOnMissingBean(Array(classOf[NodeOverLoadInfoManager]))
- def createNodeOverLoadInfoManager(): NodeOverLoadInfoManager = {
+}
+
+object FlinkLockerServiceHolder extends Logging {
- new DefaultNodeOverLoadInfoManager
+ private var lockService: LockService = _
+
+ def registerLockService(service: LockService): Unit = {
+ Utils.tryAndError {
+ if (null != service) {
+ if (null == lockService) {
+ logger.info(s"Will register lockService :
${service.getClass.getName}")
+ lockService = service
+ } else {
+ logger.warn(
+ s"Default lockService has been registered to
${lockService.getClass.getName}, will not register :
${service.getClass.getName}"
+ )
+ }
+ } else {
+ logger.warn("Cannot register null lockService")
+ }
+ }
}
+ def getDefaultLockService(): LockService = lockService
+
}
diff --git
a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/EngineNode.java
b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/constants/FlinkECConstant.scala
similarity index 65%
copy from
linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/EngineNode.java
copy to
linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/constants/FlinkECConstant.scala
index e3b8548bf..6b3a0d356 100644
---
a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/EngineNode.java
+++
b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/constants/FlinkECConstant.scala
@@ -15,15 +15,23 @@
* limitations under the License.
*/
-package org.apache.linkis.manager.common.entity.node;
+package org.apache.linkis.engineconnplugin.flink.constants
-public interface EngineNode extends AMNode, RMNode, LabelNode {
+object FlinkECConstant {
- EMNode getEMNode();
+ val FLINK_MANAGER_OPERATION_TYPE_KEY = "operationType"
- void setEMNode(EMNode emNode);
+ val FLINK_OPERATION_BOUNDARY_KEY = "operationBoundary"
- String getLock();
+ val EC_INSTANCE_KEY = "ecInstance"
- void setLock(String lock);
+ val MSG_KEY = "msg"
+
+ val SNAPHOT_KEY = "snapshot"
+
+ val SAVAPOINT_PATH_KEY = "savepointPath"
+
+ val SAVEPOINT_MODE_KEY = "mode"
+
+ val RESULT_SAVEPOINT_KEY = "writtenSavepoint"
}
diff --git
a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/context/EnvironmentContext.scala
b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/context/EnvironmentContext.scala
index 40bc732e5..eac436851 100644
---
a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/context/EnvironmentContext.scala
+++
b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/context/EnvironmentContext.scala
@@ -52,6 +52,8 @@ class EnvironmentContext(
private var deploymentTarget: String = YarnDeploymentTarget.PER_JOB.getName
+ private var extraParams: util.Map[String, Any] = _
+
def this(
defaultEnv: Environment,
systemConfiguration: Configuration,
@@ -63,7 +65,8 @@ class EnvironmentContext(
providedLibDirsArray: Array[String],
shipDirsArray: Array[String],
dependencies: util.List[URL],
- flinkExecutionTarget: String
+ flinkExecutionTarget: String,
+ extraParams: util.Map[String, Any]
) {
this(
defaultEnv,
@@ -89,6 +92,8 @@ class EnvironmentContext(
this.flinkConfig.set(LinkisYarnClusterClientFactory.YARN_CONFIG_DIR,
this.yarnConfDir)
// set flink dist-jar(设置 flink dist jar)
this.flinkConfig.set(YarnConfigOptions.FLINK_DIST_JAR, distJarPath)
+ // other params
+ this.extraParams = extraParams
}
}
@@ -114,6 +119,13 @@ class EnvironmentContext(
def getDependencies: util.List[URL] = dependencies
+ def setExtraParams(params: util.Map[String, Any]): EnvironmentContext = {
+ this.extraParams = params
+ this
+ }
+
+ def getExtraParams(): util.Map[String, Any] = extraParams
+
override def equals(o: Any): Boolean = o match {
case context: EnvironmentContext =>
if (this eq context) return true
diff --git
a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkJarOnceExecutor.scala
b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkJarOnceExecutor.scala
index 8e2da4fbe..659cfcbc2 100644
---
a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkJarOnceExecutor.scala
+++
b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkJarOnceExecutor.scala
@@ -18,13 +18,27 @@
package org.apache.linkis.engineconnplugin.flink.executor
import org.apache.linkis.common.utils.Utils
+import
org.apache.linkis.engineconn.acessible.executor.service.ExecutorHeartbeatServiceHolder
+import org.apache.linkis.engineconn.executor.service.ManagerService
import org.apache.linkis.engineconn.once.executor.OnceExecutorExecutionContext
-import
org.apache.linkis.engineconnplugin.flink.client.deployment.AbstractApplicationClusterDescriptorAdapter
+import org.apache.linkis.engineconnplugin.flink.client.deployment.{
+ AbstractApplicationClusterDescriptorAdapter,
+ YarnApplicationClusterDescriptorAdapter
+}
+import org.apache.linkis.engineconnplugin.flink.config.FlinkEnvConfiguration
import org.apache.linkis.engineconnplugin.flink.config.FlinkEnvConfiguration._
import org.apache.linkis.engineconnplugin.flink.context.FlinkEngineConnContext
+import org.apache.linkis.engineconnplugin.flink.operator.StatusOperator
+import org.apache.linkis.engineconnplugin.flink.util.YarnUtil
+import org.apache.linkis.governance.common.conf.GovernanceCommonConf
+import org.apache.linkis.governance.common.constant.ec.ECConstants
+import org.apache.linkis.manager.common.entity.enumeration.NodeStatus
import org.apache.commons.lang3.StringUtils
+import java.util
+import java.util.concurrent.{Future, TimeUnit}
+
import scala.concurrent.duration.Duration
class FlinkJarOnceExecutor(
@@ -32,6 +46,10 @@ class FlinkJarOnceExecutor(
override protected val flinkEngineConnContext: FlinkEngineConnContext
) extends FlinkOnceExecutor[AbstractApplicationClusterDescriptorAdapter] {
+ private var daemonThread: Future[_] = _
+
+ private var firstReportAppIdTimestampMills: Long = 0L
+
override def doSubmit(
onceExecutorExecutionContext: OnceExecutorExecutionContext,
options: Map[String, String]
@@ -48,6 +66,73 @@ class FlinkJarOnceExecutor(
Utils.waitUntil(() => clusterDescriptor.initJobId(), Duration.Inf)
setJobID(clusterDescriptor.getJobId.toHexString)
super.waitToRunning()
+ if
(YarnUtil.isDetach(flinkEngineConnContext.getEnvironmentContext.getExtraParams()))
{
+ waitToExit()
+ }
+ }
+
+ override def close(): Unit = {
+ super.close()
+ if (null != daemonThread) {
+ daemonThread.cancel(true)
+ }
+ }
+
+ override protected def closeYarnApp(): Unit = {
+ if
(YarnUtil.isDetach(flinkEngineConnContext.getEnvironmentContext.getExtraParams()))
{
+ if (getStatus == NodeStatus.Failed) {
+ logger.info("Will kill yarn app on close with clientType : detach,
because status failed.")
+ super.closeYarnApp()
+ } else {
+ logger.info("Skip to kill yarn app on close with clientType : detach.")
+ }
+ } else {
+ logger.info("Will kill yarn app on close with clientType : attach.")
+ super.closeYarnApp()
+ }
+ }
+
+ private def waitToExit(): Unit = {
+ // upload applicationId to manager and then exit
+ val thisExecutor = this
+ if (!isCompleted) {
+ daemonThread = Utils.defaultScheduler.scheduleWithFixedDelay(
+ new Runnable {
+ override def run(): Unit = {
+ if (!isCompleted) {
+ Utils.waitUntil(() => StringUtils.isNotBlank(getApplicationId),
Duration.apply("10s"))
+ if (StringUtils.isNotBlank(getApplicationId)) {
+ Utils.tryAndWarn {
+ val heartbeatService =
ExecutorHeartbeatServiceHolder.getDefaultHeartbeatService()
+ if (null == heartbeatService) {
+ logger.error("HeartbeatService not inited.")
+ return null
+ }
+ val heartbeatMsg =
heartbeatService.generateHeartBeatMsg(thisExecutor)
+
ManagerService.getManagerService.heartbeatReport(heartbeatMsg)
+ logger.info(
+ s"Succeed to report heatbeatMsg :
${heartbeatMsg.getHeartBeatMsg}, will add handshake."
+ )
+ if (0L >= firstReportAppIdTimestampMills) {
+ firstReportAppIdTimestampMills = System.currentTimeMillis()
+ }
+ if (!StatusOperator.isHandshaked) {
+ StatusOperator.addHandshake()
+ } else {
+ logger.info("Will exit with handshaked.")
+ trySucceed()
+ }
+ }
+ }
+ }
+ }
+ },
+ 1000,
+
FlinkEnvConfiguration.FLINK_ONCE_JAR_APP_REPORT_APPLICATIONID_INTERVAL.getValue.toLong,
+ TimeUnit.MILLISECONDS
+ )
+ logger.info("waitToExit submited.")
+ }
}
}
diff --git
a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkManagerConcurrentExecutor.scala
b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkManagerConcurrentExecutor.scala
new file mode 100644
index 000000000..b204c2405
--- /dev/null
+++
b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkManagerConcurrentExecutor.scala
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineconnplugin.flink.executor
+
+import org.apache.linkis.common.utils.Logging
+import
org.apache.linkis.engineconn.acessible.executor.conf.AccessibleExecutorConfiguration
+import
org.apache.linkis.engineconn.acessible.executor.service.EngineConnConcurrentLockService
+import org.apache.linkis.engineconn.common.creation.EngineCreationContext
+import org.apache.linkis.engineconn.computation.executor.execute.{
+ ComputationExecutor,
+ ConcurrentComputationExecutor,
+ EngineExecutionContext
+}
+import org.apache.linkis.engineconn.once.executor.OnceExecutorExecutionContext
+import
org.apache.linkis.engineconnplugin.flink.client.deployment.ClusterDescriptorAdapter
+import org.apache.linkis.engineconnplugin.flink.config.FlinkLockerServiceHolder
+import org.apache.linkis.engineconnplugin.flink.context.FlinkEngineConnContext
+import org.apache.linkis.engineconnplugin.flink.errorcode.FlinkErrorCodeSummary
+import org.apache.linkis.engineconnplugin.flink.exception.JobExecutionException
+import org.apache.linkis.engineconnplugin.flink.util.ManagerUtil
+import org.apache.linkis.manager.common.entity.enumeration.NodeStatus
+import org.apache.linkis.protocol.engine.JobProgressInfo
+import org.apache.linkis.scheduler.executer.{
+ AsynReturnExecuteResponse,
+ ErrorExecuteResponse,
+ ExecuteResponse
+}
+
+class FlinkManagerConcurrentExecutor(
+ val id: Long,
+ maxRunningNumber: Int,
+ val flinkEngineConnContext: FlinkEngineConnContext
+) extends FlinkOnceExecutor[ClusterDescriptorAdapter]
+ with FlinkExecutor
+ with Logging {
+
+ override protected def submit(
+ onceExecutorExecutionContext: OnceExecutorExecutionContext
+ ): Unit = {
+ logger.info("Succeed to init FlinkManagerExecutor.")
+ }
+
+ override def execute(
+ onceExecutorExecutionContext: OnceExecutorExecutionContext
+ ): ExecuteResponse = {
+ val isManager = ManagerUtil.isManager
+ val lockService = FlinkLockerServiceHolder.getDefaultLockService()
+ if (
+ isManager && null != lockService && lockService
+ .isInstanceOf[EngineConnConcurrentLockService]
+ ) {
+ val msg = "Succeed to init FlinkManagerExecutor."
+ logger.info(msg)
+ new AsynReturnExecuteResponse {
+ override def notify(rs: ExecuteResponse => Unit): Unit = {
+ logger.info(s"FlinkManagerExecutor will skip listener : ${rs}")
+ }
+ }
+ } else {
+ ErrorExecuteResponse(
+ "FlinkManagerExecutor got default lockService is not instance of
EngineConnConcurrentLockService, will shutdown.",
+ null
+ )
+ }
+ }
+
+ override def getId: String = id.toString
+
+ override def close(): Unit = {
+ logger.info(s"FlinkManagerExecutor : ${getId} will close.")
+ super.close()
+ }
+
+ def getMaxRunningNumber: Int = maxRunningNumber
+
+ def getFlinkContext(): FlinkEngineConnContext = flinkEngineConnContext
+
+ override def doSubmit(
+ onceExecutorExecutionContext: OnceExecutorExecutionContext,
+ options: Map[String, String]
+ ): Unit = submit(onceExecutorExecutionContext)
+
+ override protected def initOnceExecutorExecutionContext(
+ onceExecutorExecutionContext: OnceExecutorExecutionContext
+ ): Unit = {}
+
+ override protected def createOnceExecutorExecutionContext(
+ engineCreationContext: EngineCreationContext
+ ): OnceExecutorExecutionContext = new OnceExecutorExecutionContext(null,
null)
+
+ override def tryReady(): Boolean = {
+ // set default status to Unlock
+ transition(NodeStatus.Unlock)
+ true
+ }
+
+}
diff --git
a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkOnceExecutor.scala
b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkOnceExecutor.scala
index f3d15a7e9..c30e88584 100644
---
a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkOnceExecutor.scala
+++
b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkOnceExecutor.scala
@@ -120,9 +120,7 @@ trait FlinkOnceExecutor[T <: ClusterDescriptorAdapter]
if (daemonThread != null) daemonThread.cancel(true)
}
- override def close(): Unit = {
- super.close()
- closeDaemon()
+ protected def closeYarnApp(): Unit = {
if (clusterDescriptor != null) {
clusterDescriptor.cancelJob()
clusterDescriptor.close()
@@ -130,6 +128,12 @@ trait FlinkOnceExecutor[T <: ClusterDescriptorAdapter]
flinkEngineConnContext.getExecutionContext.getClusterClientFactory.close()
}
+ override def close(): Unit = {
+ super.close()
+ closeDaemon()
+ closeYarnApp()
+ }
+
override protected def waitToRunning(): Unit = {
if (!isCompleted) {
daemonThread = Utils.defaultScheduler.scheduleAtFixedRate(
diff --git
a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala
b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala
index e6cdcc5da..3d1b83447 100644
---
a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala
+++
b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala
@@ -17,8 +17,11 @@
package org.apache.linkis.engineconnplugin.flink.factory
+import org.apache.linkis.common.conf.CommonVars
import org.apache.linkis.common.utils.{ClassUtils, Logging}
+import
org.apache.linkis.engineconn.acessible.executor.conf.AccessibleExecutorConfiguration
import org.apache.linkis.engineconn.common.creation.EngineCreationContext
+import org.apache.linkis.engineconn.launch.EngineConnServer
import org.apache.linkis.engineconnplugin.flink.client.config.Environment
import
org.apache.linkis.engineconnplugin.flink.client.config.entries.ExecutionEntry
import org.apache.linkis.engineconnplugin.flink.client.context.ExecutionContext
@@ -32,7 +35,8 @@ import
org.apache.linkis.engineconnplugin.flink.context.{EnvironmentContext, Fli
import
org.apache.linkis.engineconnplugin.flink.errorcode.FlinkErrorCodeSummary._
import
org.apache.linkis.engineconnplugin.flink.exception.FlinkInitFailedException
import org.apache.linkis.engineconnplugin.flink.setting.Settings
-import org.apache.linkis.engineconnplugin.flink.util.ClassUtil
+import org.apache.linkis.engineconnplugin.flink.util.{ClassUtil, ManagerUtil}
+import org.apache.linkis.governance.common.conf.GovernanceCommonConf
import org.apache.linkis.manager.engineplugin.common.conf.EnvConfiguration
import org.apache.linkis.manager.engineplugin.common.creation.{
ExecutorFactory,
@@ -41,6 +45,7 @@ import
org.apache.linkis.manager.engineplugin.common.creation.{
import org.apache.linkis.manager.label.entity.Label
import org.apache.linkis.manager.label.entity.engine._
import org.apache.linkis.manager.label.entity.engine.EngineType.EngineType
+import org.apache.linkis.protocol.utils.TaskUtils
import org.apache.commons.lang3.StringUtils
import org.apache.flink.configuration._
@@ -108,6 +113,19 @@ class FlinkEngineConnFactory extends
MultiExecutorEngineConnFactory with Logging
val providedLibDirsArray =
FLINK_LIB_LOCAL_PATH.getValue(options).split(",")
// Ship directories
val shipDirsArray = getShipDirectories(options)
+ // other params
+ val flinkClientType =
GovernanceCommonConf.EC_APP_MANAGE_MODE.getValue(options)
+ val otherParams = new util.HashMap[String, Any]()
+ val isManager = ManagerUtil.isManager
+ if (isManager) {
+// logger.info(
+// s"flink manager mode on. Will set
${AccessibleExecutorConfiguration.ENGINECONN_SUPPORT_PARALLELISM.key} to true."
+// )
+ logger.info(
+ s"support parallelism :
${AccessibleExecutorConfiguration.ENGINECONN_SUPPORT_PARALLELISM.getHotValue()}"
+ )
+ }
+ otherParams.put(GovernanceCommonConf.EC_APP_MANAGE_MODE.key,
flinkClientType.toLowerCase())
val context = new EnvironmentContext(
defaultEnv,
new Configuration,
@@ -119,7 +137,8 @@ class FlinkEngineConnFactory extends
MultiExecutorEngineConnFactory with Logging
providedLibDirsArray,
shipDirsArray,
new util.ArrayList[URL],
- flinkExecutionTarget
+ flinkExecutionTarget,
+ otherParams
)
// Step1: environment-level configurations
val jobName = options.getOrDefault("flink.app.name", "EngineConn-Flink")
@@ -445,8 +464,14 @@ class FlinkEngineConnFactory extends
MultiExecutorEngineConnFactory with Logging
): FlinkEngineConnContext =
new FlinkEngineConnContext(environmentContext)
- override protected def getDefaultExecutorFactoryClass: Class[_ <:
ExecutorFactory] =
- classOf[FlinkCodeExecutorFactory]
+ override protected def getDefaultExecutorFactoryClass: Class[_ <:
ExecutorFactory] = {
+ val options = EngineConnServer.getEngineCreationContext.getOptions
+ if (FlinkEnvConfiguration.FLINK_MANAGER_MODE_CONFIG_KEY.getValue(options))
{
+ classOf[FlinkManagerExecutorFactory]
+ } else {
+ classOf[FlinkCodeExecutorFactory]
+ }
+ }
override protected def getEngineConnType: EngineType = EngineType.FLINK
@@ -454,7 +479,8 @@ class FlinkEngineConnFactory extends
MultiExecutorEngineConnFactory with Logging
ClassUtil.getInstance(classOf[FlinkSQLExecutorFactory], new
FlinkSQLExecutorFactory),
ClassUtil
.getInstance(classOf[FlinkApplicationExecutorFactory], new
FlinkApplicationExecutorFactory),
- ClassUtil.getInstance(classOf[FlinkCodeExecutorFactory], new
FlinkCodeExecutorFactory)
+ ClassUtil.getInstance(classOf[FlinkCodeExecutorFactory], new
FlinkCodeExecutorFactory),
+ ClassUtil.getInstance(classOf[FlinkManagerExecutorFactory], new
FlinkManagerExecutorFactory)
)
override def getExecutorFactories: Array[ExecutorFactory] =
executorFactoryArray
diff --git
a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkManagerExecutorFactory.scala
b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkManagerExecutorFactory.scala
new file mode 100644
index 000000000..6bdf43297
--- /dev/null
+++
b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkManagerExecutorFactory.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineconnplugin.flink.factory
+
+import org.apache.linkis.engineconn.common.creation.EngineCreationContext
+import org.apache.linkis.engineconn.common.engineconn.EngineConn
+import
org.apache.linkis.engineconn.computation.executor.creation.ComputationExecutorFactory
+import
org.apache.linkis.engineconn.computation.executor.execute.ComputationExecutor
+import org.apache.linkis.engineconn.once.executor.OnceExecutor
+import org.apache.linkis.engineconn.once.executor.creation.OnceExecutorFactory
+import org.apache.linkis.engineconnplugin.flink.config.FlinkEnvConfiguration
+import org.apache.linkis.engineconnplugin.flink.context.FlinkEngineConnContext
+import org.apache.linkis.engineconnplugin.flink.executor.{
+ FlinkCodeOnceExecutor,
+ FlinkManagerConcurrentExecutor
+}
+import
org.apache.linkis.engineconnplugin.flink.factory.FlinkManagerExecutorFactory.setDefaultExecutor
+import org.apache.linkis.manager.label.entity.Label
+import org.apache.linkis.manager.label.entity.engine.RunType._
+
+class FlinkManagerExecutorFactory extends OnceExecutorFactory {
+
+ override protected def newExecutor(
+ id: Int,
+ engineCreationContext: EngineCreationContext,
+ engineConn: EngineConn,
+ labels: Array[Label[_]]
+ ): OnceExecutor = engineConn.getEngineConnSession match {
+ case flinkEngineConnContext: FlinkEngineConnContext =>
+ val executor = new FlinkManagerConcurrentExecutor(
+ id,
+ FlinkEnvConfiguration.FLINK_MANAGER_LOAD_TASK_MAX.getValue,
+ flinkEngineConnContext
+ )
+ setDefaultExecutor(executor)
+ executor
+ }
+
+ // just set lots of runType, but now only sql is supported.
+ override protected def getSupportRunTypes: Array[String] =
+ Array(JSON.toString)
+
+ override protected def getRunType: RunType = JSON
+}
+
+object FlinkManagerExecutorFactory {
+
+ private var defaultExecutor: FlinkManagerConcurrentExecutor = _
+
+ def setDefaultExecutor(executor: FlinkManagerConcurrentExecutor): Unit = {
+ defaultExecutor = executor
+ }
+
+ def getDefaultExecutor(): FlinkManagerConcurrentExecutor = defaultExecutor
+
+}
diff --git
a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/hook/EngineLoadOperationHook.scala
b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/hook/EngineLoadOperationHook.scala
new file mode 100644
index 000000000..78ae65307
--- /dev/null
+++
b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/hook/EngineLoadOperationHook.scala
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineconnplugin.flink.hook
+
+import org.apache.linkis.common.utils.Logging
+import
org.apache.linkis.engineconn.acessible.executor.entity.AccessibleExecutor
+import org.apache.linkis.engineconn.acessible.executor.hook.OperationHook
+import org.apache.linkis.engineconn.core.executor.ExecutorManager
+import org.apache.linkis.engineconnplugin.flink.config.FlinkEnvConfiguration
+import
org.apache.linkis.engineconnplugin.flink.factory.FlinkManagerExecutorFactory
+import org.apache.linkis.manager.common.entity.enumeration.NodeStatus
+import org.apache.linkis.manager.common.protocol.engine.{
+ EngineOperateRequest,
+ EngineOperateResponse
+}
+
+import org.springframework.stereotype.Service
+
+import javax.annotation.PostConstruct
+
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.mutable
+
+@Service
+class EngineLoadOperationHook extends OperationHook with Logging {
+
+ @PostConstruct
+ private def init(): Unit = {
+ OperationHook.registerOperationHook(this)
+ logger.info(s"${getName()} init success.")
+ }
+
+ private val taskNum = new AtomicInteger(0)
+ private val lock = new Object
+
+ override def getName(): String = getClass.getSimpleName
+
+ override def doPreOperation(
+ engineOperateRequest: EngineOperateRequest,
+ engineOperateResponse: EngineOperateResponse
+ ): Unit = {
+ ExecutorManager.getInstance.getReportExecutor match {
+ case accessibleExecutor: AccessibleExecutor =>
+ accessibleExecutor.updateLastActivityTime()
+ case _ =>
+ }
+ if (
+ taskNum.incrementAndGet() >=
FlinkEnvConfiguration.FLINK_MANAGER_LOAD_TASK_MAX.getHotValue()
+ ) {
+ lock.synchronized {
+ if (
+ taskNum
+ .incrementAndGet() >=
FlinkEnvConfiguration.FLINK_MANAGER_LOAD_TASK_MAX.getHotValue()
+ ) {
+ FlinkManagerExecutorFactory.getDefaultExecutor() match {
+ case accessibleExecutor: AccessibleExecutor =>
+ if (NodeStatus.Busy != accessibleExecutor.getStatus) {
+ accessibleExecutor.transition(NodeStatus.Busy)
+ logger.warn("The number of tasks exceeds the maximum limit,
change status to busy.")
+ }
+ case _ =>
logger.error("FlinkManagerExecutorFactory.getDefaultExecutor() is None.")
+ }
+ }
+ }
+ }
+ }
+
+ override def doPostOperation(
+ engineOperateRequest: EngineOperateRequest,
+ engineOperateResponse: EngineOperateResponse
+ ): Unit = {
+ if (taskNum.get() - 1 <
FlinkEnvConfiguration.FLINK_MANAGER_LOAD_TASK_MAX.getHotValue()) {
+ lock.synchronized {
+ if (
+ taskNum
+ .decrementAndGet() <
FlinkEnvConfiguration.FLINK_MANAGER_LOAD_TASK_MAX.getHotValue()
+ ) {
+ FlinkManagerExecutorFactory.getDefaultExecutor() match {
+ case accessibleExecutor: AccessibleExecutor =>
+ if (NodeStatus.Busy == accessibleExecutor.getStatus) {
+ accessibleExecutor.transition(NodeStatus.Unlock)
+ logger.warn(
+ "The number of tasks is less than the maximum limit, change
status to unlock."
+ )
+ }
+ case _ =>
logger.error("FlinkManagerExecutorFactory.getDefaultExecutor() is None.")
+ }
+ }
+ }
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug(s"taskNum: ${taskNum.get()}")
+ }
+ }
+
+}
diff --git
a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/operator/KillOperator.scala
b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/operator/KillOperator.scala
new file mode 100644
index 000000000..1a917526b
--- /dev/null
+++
b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/operator/KillOperator.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineconnplugin.flink.operator
+
+import org.apache.linkis.common.utils.{Logging, Utils}
+import org.apache.linkis.engineconnplugin.flink.constants.FlinkECConstant
+import
org.apache.linkis.engineconnplugin.flink.operator.clientmanager.FlinkRestClientManager
+import org.apache.linkis.engineconnplugin.flink.util.YarnUtil
+import org.apache.linkis.engineconnplugin.flink.util.YarnUtil.logAndException
+import org.apache.linkis.governance.common.constant.ec.ECConstants
+import org.apache.linkis.governance.common.exception.GovernanceErrorException
+import org.apache.linkis.manager.common.operator.Operator
+import org.apache.linkis.server.toScalaMap
+
+import org.apache.hadoop.yarn.api.records.{ApplicationId,
FinalApplicationStatus}
+
+import java.util
+
+import scala.collection.JavaConverters.collectionAsScalaIterableConverter
+import scala.collection.mutable
+
+class KillOperator extends Operator with Logging {
+
+ override def getNames: Array[String] = Array("kill")
+
+ @throws[GovernanceErrorException]
+ override def apply(params: util.Map[String, Object]): util.Map[String,
Object] = {
+
+ val rsMap = new mutable.HashMap[String, String]
+ val appIdStr = params.getOrElse(ECConstants.YARN_APPID_NAME_KEY,
"").asInstanceOf[String]
+ val snapShot = params.getOrElse(FlinkECConstant.SNAPHOT_KEY,
"false").toString.toBoolean
+
+ val appId: ApplicationId = YarnUtil.retrieveApplicationId(appIdStr)
+
+ var isStopped = false
+ val restClient =
+ Utils.tryCatch {
+ FlinkRestClientManager.getFlinkRestClient(appIdStr)
+ } { case e: Exception =>
+ val yarnClient = YarnUtil.getYarnClient()
+ val appReport = yarnClient.getApplicationReport(appId)
+ if (appReport.getFinalApplicationStatus !=
FinalApplicationStatus.UNDEFINED) {
+ // Flink cluster is not running anymore
+ val msg =
+ s"The application ${appIdStr} doesn't run anymore. It has
previously completed with final status:
${appReport.getFinalApplicationStatus.toString}"
+ logAndException(msg)
+ isStopped = true
+ null
+ } else {
+ val msg = s"Get client for app ${appIdStr} failed, because :
${e.getMessage}"
+ throw logAndException(msg)
+ }
+ }
+ if (!isStopped) {
+ if (snapShot) {
+ val checkPointPath =
+ params.getOrElse(FlinkECConstant.SAVAPOINT_PATH_KEY,
null).asInstanceOf[String]
+ val rs = YarnUtil.triggerSavepoint(appIdStr, checkPointPath,
restClient)
+ rsMap.put(FlinkECConstant.MSG_KEY, rs)
+ }
+ val jobs = restClient.listJobs().get()
+ if (null == jobs || jobs.isEmpty) {
+ val msg = s"App : ${appIdStr} have no jobs, but is not ended."
+ throw logAndException(msg)
+ }
+ val msg = s"Try to kill ${jobs.size()} jobs of app : ${appIdStr}"
+ jobs.asScala.foreach(job => restClient.cancel(job.getJobId))
+ rsMap += (FlinkECConstant.MSG_KEY -> msg)
+ }
+
+ rsMap.toMap[String, String]
+ val map = new util.HashMap[String, Object]()
+ rsMap.foreach(entry => map.put(entry._1, entry._2))
+ map
+ }
+
+}
diff --git
a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/operator/ListOperator.scala
b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/operator/ListOperator.scala
new file mode 100644
index 000000000..1497763de
--- /dev/null
+++
b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/operator/ListOperator.scala
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineconnplugin.flink.operator
+
+import org.apache.linkis.common.utils.{Logging, Utils}
+import org.apache.linkis.engineconn.common.exception.EngineConnException
+import org.apache.linkis.engineconnplugin.flink.util.YarnUtil
+import org.apache.linkis.governance.common.constant.ec.ECConstants
+import org.apache.linkis.governance.common.exception.GovernanceErrorException
+import
org.apache.linkis.governance.common.exception.engineconn.EngineConnExecutorErrorCode
+import org.apache.linkis.manager.common.operator.Operator
+import org.apache.linkis.server.{toScalaBuffer, toScalaMap,
BDPJettyServerHelper}
+
+import org.apache.hadoop.yarn.api.records.{FinalApplicationStatus,
YarnApplicationState}
+
+import java.util
+
+import scala.collection.mutable
+
+class ListOperator extends Operator with Logging {
+
+ private val json = BDPJettyServerHelper.jacksonJson
+
+ override def getNames: Array[String] = Array("list")
+
+ @throws[GovernanceErrorException]
+ override def apply(params: util.Map[String, Object]): util.Map[String,
Object] = {
+
+ val applicationTypeSet = new util.HashSet[String]()
+ var appStateSet =
util.EnumSet.of[YarnApplicationState](YarnApplicationState.RUNNING)
+ var appName = ""
+
+ Utils.tryCatch {
+ val appTypeList = params
+ .getOrElse(ECConstants.YARN_APP_TYPE_LIST_KEY, new
util.ArrayList[String]())
+ .asInstanceOf[util.List[String]]
+ appTypeList.foreach(applicationTypeSet.add)
+ val appStateList = params
+ .getOrElse(ECConstants.YARN_APP_STATE_LIST_KEY, new
util.ArrayList[String]())
+ .asInstanceOf[util.List[String]]
+ val appStateArray = new util.HashSet[YarnApplicationState]
+ appStateList.foreach(e =>
appStateArray.add(YarnApplicationState.valueOf(e)))
+ if (!appStateArray.isEmpty) {
+ appStateSet = util.EnumSet.copyOf(appStateArray)
+ }
+ appName = params.getOrElse(ECConstants.YARN_APP_NAME_KEY,
"").asInstanceOf[String]
+ } { e: Throwable =>
+ val msg = "Invalid params. " + e.getMessage
+ logger.error(msg, e)
+ throw new
EngineConnException(EngineConnExecutorErrorCode.INVALID_PARAMS, msg)
+ }
+
+ val yarnClient = YarnUtil.getYarnClient()
+ val appList = yarnClient.getApplications(applicationTypeSet, appStateSet)
+ val rsMap = new mutable.HashMap[String, String]
+ Utils.tryCatch {
+ val appTypeStr = json.writeValueAsString(applicationTypeSet)
+ val appStateStr = json.writeValueAsString(appStateSet)
+ val rsAppList = new util.ArrayList[util.Map[String, String]]()
+ appList.foreach(report => {
+ if (report.getName.contains(appName)) {
+ val tmpMap = new util.HashMap[String, String]()
+ tmpMap.put(ECConstants.YARN_APP_NAME_KEY, report.getName)
+ tmpMap.put(ECConstants.YARN_APP_TYPE_KEY, report.getApplicationType)
+ tmpMap.put(ECConstants.YARN_APPID_NAME_KEY,
report.getApplicationId.toString)
+ tmpMap.put(ECConstants.YARN_APP_URL_KEY, report.getTrackingUrl)
+ val appStatus =
+ if (report.getFinalApplicationStatus !=
FinalApplicationStatus.UNDEFINED) {
+ report.getFinalApplicationStatus
+ } else {
+ report.getYarnApplicationState
+ }
+ tmpMap.put(
+ ECConstants.NODE_STATUS_KEY,
+ YarnUtil
+ .convertYarnStateToNodeStatus(report.getApplicationId.toString,
appStatus.toString)
+ .toString
+ )
+ rsAppList.add(tmpMap)
+ }
+ })
+ val listStr = json.writeValueAsString(rsAppList)
+
+ logger.info(
+ s"List yarn apps, params : appTypeSet : ${appTypeStr}, appStateSet :
${appStateStr}, list : ${listStr}"
+ )
+
+ rsMap += (ECConstants.YARN_APP_RESULT_LIST_KEY -> listStr)
+ } { case e: Exception =>
+ val msg = "convert listStr failed. Because : " + e.getMessage
+ logger.error(msg)
+ throw e
+ }
+
+ val map = new util.HashMap[String, Object]()
+ rsMap.foreach(e => map.put(e._1, e._2))
+ map
+ }
+
+}
diff --git
a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/operator/StatusOperator.scala
b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/operator/StatusOperator.scala
new file mode 100644
index 000000000..05f1200bb
--- /dev/null
+++
b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/operator/StatusOperator.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineconnplugin.flink.operator
+
+import org.apache.linkis.common.exception.{LinkisException,
LinkisRuntimeException}
+import org.apache.linkis.common.utils.{Logging, Utils}
+import org.apache.linkis.engineconn.common.exception.EngineConnException
+import org.apache.linkis.engineconnplugin.flink.util.{ManagerUtil, YarnUtil}
+import org.apache.linkis.engineconnplugin.flink.util.YarnUtil.logAndException
+import org.apache.linkis.governance.common.constant.ec.ECConstants
+import
org.apache.linkis.governance.common.exception.engineconn.EngineConnExecutorErrorCode
+import org.apache.linkis.manager.common.entity.enumeration.NodeStatus
+import org.apache.linkis.manager.common.operator.Operator
+import org.apache.linkis.server.toScalaMap
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.hadoop.yarn.api.records.{ApplicationId, ApplicationReport,
FinalApplicationStatus}
+import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException
+
+import java.util
+
+import scala.collection.mutable
+
+class StatusOperator extends Operator with Logging {
+
+ override def getNames: Array[String] = Array("status")
+
+ override def apply(params: util.Map[String, Object]): util.Map[String,
Object] = {
+
+ val appIdStr = params.getOrElse(ECConstants.YARN_APPID_NAME_KEY,
"").asInstanceOf[String]
+
+ val parts = appIdStr.split("_")
+ val clusterTimestamp = parts(1).toLong
+ val sequenceNumber = parts(2).toInt
+
+ // Create an ApplicationId object using newInstance method
+ val appId = ApplicationId.newInstance(clusterTimestamp, sequenceNumber)
+ val rsMap = new mutable.HashMap[String, String]
+
+ val yarnClient = YarnUtil.getYarnClient()
+ var appReport: ApplicationReport = null
+ Utils.tryCatch {
+ appReport = yarnClient.getApplicationReport(appId)
+ if (null == appReport) {
+ throw logAndException(s"Got null appReport for appid : ${appIdStr}")
+ }
+ } { case notExist: ApplicationNotFoundException =>
+ logger.error(s"Application : ${appIdStr} not exists, will set the status
to failed.")
+ val map = new util.HashMap[String, Object]()
+ map.put(ECConstants.NODE_STATUS_KEY, NodeStatus.Failed.toString)
+ map.put(ECConstants.YARN_APPID_NAME_KEY, appIdStr)
+ return map
+ }
+
+ // Get the application status (YarnApplicationState)
+ val appStatus = if (appReport.getFinalApplicationStatus !=
FinalApplicationStatus.UNDEFINED) {
+ appReport.getFinalApplicationStatus
+ } else {
+ appReport.getYarnApplicationState
+ }
+
+ val nodeStatus: NodeStatus =
YarnUtil.convertYarnStateToNodeStatus(appIdStr, appStatus.toString)
+
+ logger.info(s"try to get appid: ${appIdStr}, status
${nodeStatus.toString}.")
+ rsMap += (ECConstants.NODE_STATUS_KEY -> nodeStatus.toString)
+ rsMap += (ECConstants.YARN_APPID_NAME_KEY -> appIdStr)
+ val map = new util.HashMap[String, Object]()
+ rsMap.foreach(entry => map.put(entry._1, entry._2))
+ map
+ }
+
+}
+
+object StatusOperator extends Logging {
+
+ private var handshaked: Boolean = false
+
+ def addHandshake(): Unit = {
+ handshaked = true
+ }
+
+ def isHandshaked: Boolean = handshaked
+
+}
diff --git
a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/operator/TriggerSavepointOperator.scala
b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/operator/TriggerSavepointOperator.scala
new file mode 100644
index 000000000..ceddc367c
--- /dev/null
+++
b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/operator/TriggerSavepointOperator.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineconnplugin.flink.operator
+
+import org.apache.linkis.common.utils.Logging
+import org.apache.linkis.engineconn.launch.EngineConnServer
+import org.apache.linkis.engineconn.once.executor.creation.OnceExecutorManager
+import org.apache.linkis.engineconnplugin.flink.constants.FlinkECConstant
+import
org.apache.linkis.engineconnplugin.flink.errorcode.FlinkErrorCodeSummary._
+import org.apache.linkis.engineconnplugin.flink.exception.JobExecutionException
+import org.apache.linkis.engineconnplugin.flink.executor.FlinkOnceExecutor
+import
org.apache.linkis.engineconnplugin.flink.operator.clientmanager.FlinkRestClientManager
+import org.apache.linkis.engineconnplugin.flink.util.YarnUtil
+import org.apache.linkis.engineconnplugin.flink.util.YarnUtil.logAndException
+import org.apache.linkis.governance.common.constant.ec.ECConstants
+import org.apache.linkis.governance.common.exception.GovernanceErrorException
+import org.apache.linkis.manager.common.operator.Operator
+
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus
+
+import java.text.MessageFormat
+import java.util
+
+import scala.collection.mutable
+
+class TriggerSavepointOperator extends Operator with Logging {
+
+ override def getNames: Array[String] = Array("doSavepoint")
+
+ @throws[GovernanceErrorException]
+ override def apply(params: util.Map[String, Object]): util.Map[String,
Object] = {
+ val rsMap = new mutable.HashMap[String, String]
+
+ val savepointPath = getAsThrow[String](params,
FlinkECConstant.SAVAPOINT_PATH_KEY)
+ val appIdStr = getAsThrow[String](params, ECConstants.YARN_APPID_NAME_KEY)
+ val mode = getAsThrow[String](params, FlinkECConstant.SAVEPOINT_MODE_KEY)
+
+ val appId = YarnUtil.retrieveApplicationId(appIdStr)
+ val yarnClient = YarnUtil.getYarnClient()
+ val appReport = yarnClient.getApplicationReport(appId)
+ if (appReport.getFinalApplicationStatus !=
FinalApplicationStatus.UNDEFINED) {
+ // Flink cluster is not running anymore
+ val msg =
+ s"The application ${appIdStr} doesn't run anymore. It has previously
completed with final status: ${appReport.getFinalApplicationStatus.toString}"
+ throw logAndException(msg)
+ }
+
+ logger.info(s"try to $mode savepoint with path $savepointPath.")
+ if (
+ YarnUtil.isDetach(
+
EngineConnServer.getEngineCreationContext.getOptions.asInstanceOf[util.Map[String,
Any]]
+ )
+ ) {
+ logger.info("The flink cluster is detached, use rest api to trigger
savepoint.")
+ val restClient = FlinkRestClientManager.getFlinkRestClient(appIdStr)
+ val rs = YarnUtil.triggerSavepoint(appIdStr, savepointPath, restClient)
+ rsMap.put(FlinkECConstant.RESULT_SAVEPOINT_KEY, rs)
+ } else {
+ logger.info("The flink cluster is not detached, use flink client to
trigger savepoint.")
+ OnceExecutorManager.getInstance.getReportExecutor match {
+ case flinkExecutor: FlinkOnceExecutor[_] =>
+ val writtenSavepoint =
+
flinkExecutor.getClusterDescriptorAdapter.doSavepoint(savepointPath, mode)
+ rsMap.put(FlinkECConstant.RESULT_SAVEPOINT_KEY, writtenSavepoint)
+ case executor =>
+ throw new JobExecutionException(
+ NOT_SUPPORT_SAVEPOTION.getErrorDesc +
executor.getClass.getSimpleName
+ + MessageFormat
+ .format(NOT_SUPPORT_SAVEPOTION.getErrorDesc,
executor.getClass.getSimpleName)
+ )
+ }
+ }
+ val map = new util.HashMap[String, Object]()
+ rsMap.foreach(entry => map.put(entry._1, entry._2))
+ map
+ }
+
+}
diff --git
a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/operator/clientmanager/FlinkRestClientManager.scala
b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/operator/clientmanager/FlinkRestClientManager.scala
new file mode 100644
index 000000000..d7a14beb4
--- /dev/null
+++
b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/operator/clientmanager/FlinkRestClientManager.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineconnplugin.flink.operator.clientmanager
+
+import org.apache.linkis.common.utils.Logging
+import org.apache.linkis.engineconnplugin.flink.config.FlinkEnvConfiguration
+import
org.apache.linkis.engineconnplugin.flink.executor.FlinkManagerConcurrentExecutor
+import
org.apache.linkis.engineconnplugin.flink.factory.FlinkManagerExecutorFactory
+import org.apache.linkis.engineconnplugin.flink.util.YarnUtil
+import org.apache.linkis.engineconnplugin.flink.util.YarnUtil.logAndException
+
+import org.apache.flink.client.program.rest.RestClusterClient
+import org.apache.flink.configuration.Configuration
+import org.apache.hadoop.yarn.api.records.{ApplicationId,
FinalApplicationStatus}
+
+import java.util.concurrent.TimeUnit
+
+import com.google.common.cache.{
+ CacheBuilder,
+ CacheLoader,
+ LoadingCache,
+ RemovalListener,
+ RemovalNotification
+}
+
+object FlinkRestClientManager extends Logging {
+
+ private lazy val restclientCache
+ : LoadingCache[String, RestClusterClient[ApplicationId]] = CacheBuilder
+ .newBuilder()
+ .maximumSize(FlinkEnvConfiguration.FLINK_MANAGER_CLIENT_MAX_NUM.getValue)
+ .expireAfterAccess(
+ FlinkEnvConfiguration.FLINK_MANAGER_CLIENT_EXPIRE_MILLS.getValue,
+ TimeUnit.MILLISECONDS
+ )
+ .weakKeys()
+ .removalListener(new RemovalListener[String,
RestClusterClient[ApplicationId]]() {
+
+ override def onRemoval(
+ notification: RemovalNotification[String,
RestClusterClient[ApplicationId]]
+ ): Unit = {
+ logger.info(s"RestClusterClient of AppId : ${notification.getKey} was
removed.")
+ }
+
+ })
+ .build(new CacheLoader[String, RestClusterClient[ApplicationId]]() {
+
+ override def load(appIdStr: String): RestClusterClient[ApplicationId] = {
+
+ val appId: ApplicationId = YarnUtil.retrieveApplicationId(appIdStr)
+
+ val yarnClient = YarnUtil.getYarnClient()
+ val appReport = yarnClient.getApplicationReport(appId)
+
+ if (appReport.getFinalApplicationStatus !=
FinalApplicationStatus.UNDEFINED) {
+ // Flink cluster is not running anymore
+ val msg =
+ s"The application ${appIdStr} doesn't run anymore. It has
previously completed with final status:
${appReport.getFinalApplicationStatus.toString}"
+ throw logAndException(msg)
+ }
+
+ val executor = FlinkManagerExecutorFactory.getDefaultExecutor()
+ val tmpFlinkConf: Configuration = executor match {
+ case flinkManagerExecutor: FlinkManagerConcurrentExecutor =>
+
flinkManagerExecutor.getFlinkContext().getEnvironmentContext.getFlinkConfig.clone()
+ case _ =>
+ val msg = s"Invalid FlinkManagerConcurrentExecutor : ${executor}"
+ throw logAndException(msg)
+ }
+ YarnUtil.setClusterEntrypointInfoToConfig(tmpFlinkConf, appReport)
+ new RestClusterClient[ApplicationId](tmpFlinkConf,
appReport.getApplicationId)
+ }
+
+ })
+
+ def getFlinkRestClient(appIdStr: String): RestClusterClient[ApplicationId] =
+ restclientCache.get(appIdStr)
+
+ def setFlinkRestClient(appIdStr: String, client:
RestClusterClient[ApplicationId]): Unit =
+ restclientCache.put(appIdStr, client)
+
+}
diff --git
a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/EngineNode.java
b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/util/ManagerUtil.scala
similarity index 63%
copy from
linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/EngineNode.java
copy to
linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/util/ManagerUtil.scala
index e3b8548bf..a4dec1cf8 100644
---
a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/EngineNode.java
+++
b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/util/ManagerUtil.scala
@@ -15,15 +15,19 @@
* limitations under the License.
*/
-package org.apache.linkis.manager.common.entity.node;
+package org.apache.linkis.engineconnplugin.flink.util
-public interface EngineNode extends AMNode, RMNode, LabelNode {
+import org.apache.linkis.common.utils.Logging
+import org.apache.linkis.engineconn.launch.EngineConnServer
+import org.apache.linkis.engineconnplugin.flink.config.FlinkEnvConfiguration
- EMNode getEMNode();
+import java.util
- void setEMNode(EMNode emNode);
+object ManagerUtil extends Logging {
- String getLock();
+ val isManager: Boolean = {
+ val options = EngineConnServer.getEngineCreationContext.getOptions
+ FlinkEnvConfiguration.FLINK_MANAGER_MODE_CONFIG_KEY.getValue(options)
+ }
- void setLock(String lock);
}
diff --git
a/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/util/YarnUtil.scala
b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/util/YarnUtil.scala
new file mode 100644
index 000000000..a1c96619e
--- /dev/null
+++
b/linkis-engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/util/YarnUtil.scala
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineconnplugin.flink.util
+
+import org.apache.linkis.common.exception.ErrorException
+import org.apache.linkis.common.utils.Logging
+import org.apache.linkis.engineconn.core.executor.ExecutorManager
+import org.apache.linkis.engineconn.executor.entity.YarnExecutor
+import org.apache.linkis.engineconnplugin.flink.config.FlinkEnvConfiguration
+import org.apache.linkis.engineconnplugin.flink.exception.JobExecutionException
+import org.apache.linkis.governance.common.conf.GovernanceCommonConf
+import org.apache.linkis.governance.common.constant.ec.ECConstants
+import org.apache.linkis.manager.common.entity.enumeration.NodeStatus
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.flink
+import org.apache.flink.client.program.rest.RestClusterClient
+import org.apache.flink.configuration.{HighAvailabilityOptions,
JobManagerOptions, RestOptions}
+import org.apache.flink.runtime.client.JobStatusMessage
+import org.apache.flink.yarn.configuration.YarnConfigOptions
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.yarn.api.records.{
+ ApplicationId,
+ ApplicationReport,
+ FinalApplicationStatus,
+ YarnApplicationState
+}
+import org.apache.hadoop.yarn.client.api.YarnClient
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.util.ConverterUtils
+
+import java.util
+
+import scala.collection.JavaConverters.collectionAsScalaIterableConverter
+import scala.collection.mutable.ArrayBuffer
+
+object YarnUtil extends Logging {
+
+ val CORE_SITE = "core-site.xml"
+ val YARN_SITE = "yarn-site.xml"
+ val HDFS_SITE = "hdfs-site.xml"
+ val MAPRED_SITE = "mapred-site.xml"
+
+ private var yarnClient: YarnClient = _
+
+ def getYarnClient(): YarnClient = {
+ if (null == yarnClient) {
+ YarnUtil.getClass.synchronized {
+ if (null == yarnClient) {
+ yarnClient = createYarnClient()
+ }
+ }
+ }
+ yarnClient
+ }
+
+ private def createYarnClient(): YarnClient = {
+ val yarnClient = YarnClient.createYarnClient()
+ val hadoopConf = getHadoopConf()
+ val yarnConfiguration = new YarnConfiguration(hadoopConf)
+ yarnClient.init(yarnConfiguration)
+ yarnClient.start()
+ yarnClient
+ }
+
+ private def getHadoopConf(): Configuration = {
+ val conf = new Configuration()
+ var confRoot = FlinkEnvConfiguration.HADOOP_CONF_DIR.getValue
+ if (StringUtils.isBlank(confRoot)) {
+ throw new JobExecutionException("HADOOP_CONF_DIR or
linkis.flink.hadoop.conf.dir not set!")
+ }
+ confRoot = confRoot + "/"
+ conf.addResource(confRoot + HDFS_SITE)
+ conf.addResource(confRoot + CORE_SITE)
+ conf.addResource(confRoot + MAPRED_SITE)
+ conf.addResource(confRoot + YARN_SITE)
+ conf
+ }
+
+ def setClusterEntrypointInfoToConfig(
+ flinkConfiguration: flink.configuration.Configuration,
+ appReport: ApplicationReport
+ ): Unit = {
+ if (null == appReport) {
+ val msg = "Invalid null appReport"
+ logger.error(msg)
+ throw new JobExecutionException(msg)
+ }
+
+ val appId = appReport.getApplicationId
+ val host = appReport.getHost
+ val port = appReport.getRpcPort
+
+ logger.info(s"Found Web Interface ${host}:${port} of application
'${appId}'.")
+
+ flinkConfiguration.setString(JobManagerOptions.ADDRESS, host)
+ flinkConfiguration.setInteger(JobManagerOptions.PORT, port)
+
+ flinkConfiguration.setString(RestOptions.ADDRESS, host)
+ flinkConfiguration.setInteger(RestOptions.PORT, port)
+
+ flinkConfiguration.set(YarnConfigOptions.APPLICATION_ID,
ConverterUtils.toString(appId))
+
+ if (!flinkConfiguration.contains(HighAvailabilityOptions.HA_CLUSTER_ID)) {
+ flinkConfiguration.set(HighAvailabilityOptions.HA_CLUSTER_ID,
ConverterUtils.toString(appId))
+ }
+ }
+
+ def logAndException(msg: String, t: Throwable = null): ErrorException = {
+ logger.error(msg, t)
+ new JobExecutionException(msg)
+ }
+
+ def retrieveApplicationId(appIdStr: String): ApplicationId = {
+ val parts = appIdStr.split("_")
+ val clusterTimestamp = parts(1).toLong
+ val sequenceNumber = parts(2).toInt
+ // Create an ApplicationId object using newInstance method
+ val appId = ApplicationId.newInstance(clusterTimestamp, sequenceNumber)
+ appId
+ }
+
+ def triggerSavepoint(
+ appIdStr: String,
+ checkPointPath: String,
+ restClient: RestClusterClient[ApplicationId]
+ ): String = {
+ val jobs = restClient.listJobs().get()
+ if (null == jobs || jobs.size() > 1) {
+ val size = if (null == jobs) {
+ 0
+ } else {
+ jobs.size()
+ }
+ val msg = s"App : ${appIdStr} have ${size} jobs, cannot do snapshot."
+ throw logAndException(msg)
+ }
+ if (StringUtils.isBlank(checkPointPath)) {
+ val msg = s"App : ${appIdStr} checkpoint path is null, cannot do
checkpoint"
+ throw logAndException(msg)
+ } else {
+ val firstJob =
jobs.asScala.headOption.getOrElse(null).asInstanceOf[JobStatusMessage]
+ if (null == firstJob) {
+ val msg = s"App : ${appIdStr} got no head job, cannot do checkPoint
and cancel."
+ throw new JobExecutionException(msg)
+ }
+ val rs = restClient.triggerSavepoint(firstJob.getJobId,
checkPointPath).get()
+ rs
+ }
+ }
+
+ def convertYarnStateToNodeStatus(appIdStr: String, appStatus: String):
NodeStatus = {
+ val nodeStatus = appStatus match {
+ case finalState if
(FinalApplicationStatus.values().map(_.toString).contains(finalState)) =>
+ FinalApplicationStatus.valueOf(finalState) match {
+ case FinalApplicationStatus.KILLED | FinalApplicationStatus.FAILED =>
+ NodeStatus.Failed
+ case FinalApplicationStatus.SUCCEEDED =>
+ NodeStatus.Success
+ case _ =>
+ val msg: String = if (null != appStatus) {
+ s"Application : ${appIdStr} has unknown state :
${appStatus.toString}"
+ } else {
+ s"Application : ${appIdStr} has null state"
+ }
+ throw new JobExecutionException(msg)
+ }
+ case yarnState if
(YarnApplicationState.values().map(_.toString).contains(yarnState)) =>
+ YarnApplicationState.valueOf(yarnState) match {
+ case YarnApplicationState.FINISHED =>
+ val msg: String = "Invalid yarn app state : FINISHED"
+ throw new JobExecutionException(msg)
+ case YarnApplicationState.KILLED | YarnApplicationState.FAILED =>
+ NodeStatus.Failed
+ case _ =>
+ NodeStatus.Running
+ }
+ case _ =>
+ val msg: String = if (null != appStatus) {
+ s"Application : ${appIdStr} has unknown state :
${appStatus.toString}"
+ } else {
+ s"Application : ${appIdStr} has null state"
+ }
+ throw new JobExecutionException(msg)
+ }
+ nodeStatus
+ }
+
+ def isDetach(params: util.Map[String, Any]): Boolean = {
+ val managerOn = params.getOrDefault(
+ FlinkEnvConfiguration.FLINK_MANAGER_MODE_CONFIG_KEY.key,
+ FlinkEnvConfiguration.FLINK_MANAGER_MODE_CONFIG_KEY.getValue
+ )
+ if (null != managerOn && managerOn.toString.toBoolean) {
+ return true
+ }
+ val clientType = params
+ .getOrDefault(
+ GovernanceCommonConf.EC_APP_MANAGE_MODE.key,
+ GovernanceCommonConf.EC_APP_MANAGE_MODE.getValue
+ )
+ .toString
+ logger.info(s"clientType : ${clientType}")
+ clientType.toLowerCase() match {
+ case ECConstants.EC_CLIENT_TYPE_DETACH =>
+ true
+ case _ =>
+ false
+ }
+ }
+
+ def getAppIds: Array[String] = {
+ val ids = new ArrayBuffer[String]
+ ExecutorManager.getInstance.getExecutors.foreach(executor => {
+ executor match {
+ case yarnExecutor: YarnExecutor =>
+ ids.append(yarnExecutor.getApplicationId)
+ case _ =>
+ }
+ })
+ if (ids.size > 1) {
+ logger.error(
+ "There are more than one yarn application running, please check it.
Ids : " + ids
+ .mkString(",")
+ )
+ }
+ ids.toArray
+ }
+
+}
diff --git
a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/creation/HiveEngineConnFactory.scala
b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/creation/HiveEngineConnFactory.scala
index b637da27e..abae56eb7 100644
---
a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/creation/HiveEngineConnFactory.scala
+++
b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/creation/HiveEngineConnFactory.scala
@@ -97,7 +97,7 @@ class HiveEngineConnFactory extends
ComputationSingleExecutorEngineConnFactory w
engineCreationContext: EngineCreationContext
): AbstractHiveSession = {
// if hive engine support concurrent, return HiveConcurrentSession
- if (AccessibleExecutorConfiguration.ENGINECONN_SUPPORT_PARALLELISM) {
+ if
(AccessibleExecutorConfiguration.ENGINECONN_SUPPORT_PARALLELISM.getHotValue()) {
return doCreateHiveConcurrentSession(engineCreationContext.getOptions)
}
diff --git
a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/factory/ShellEngineConnFactory.scala
b/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/factory/ShellEngineConnFactory.scala
index cbfffc244..85baceef6 100755
---
a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/factory/ShellEngineConnFactory.scala
+++
b/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/factory/ShellEngineConnFactory.scala
@@ -38,7 +38,7 @@ class ShellEngineConnFactory extends
ComputationSingleExecutorEngineConnFactory
engineCreationContext: EngineCreationContext,
engineConn: EngineConn
): LabelExecutor = {
- if (AccessibleExecutorConfiguration.ENGINECONN_SUPPORT_PARALLELISM) {
+ if
(AccessibleExecutorConfiguration.ENGINECONN_SUPPORT_PARALLELISM.getValue) {
new ShellEngineConnConcurrentExecutor(
id,
ShellEngineConnConf.SHELL_ENGINECONN_CONCURRENT_LIMIT
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]