This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch dev-1.3.2
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git
The following commit(s) were added to refs/heads/dev-1.3.2 by this push:
new e124d91a6 [Feature][linkis-engineplugin-spark] spark submit jar
(engineConnMode is once) (#3966)
e124d91a6 is described below
commit e124d91a652efd6a77aa2cff952bbacef6875e6d
Author: rarexixi <[email protected]>
AuthorDate: Tue Dec 20 11:37:45 2022 +0800
[Feature][linkis-engineplugin-spark] spark submit jar (engineConnMode is
once) (#3966)
* [Feature][linkis-engineplugin-spark] spark submit jar (engineConnMode is
once)
---
.../once/action/CreateEngineConnAction.scala | 2 +-
.../computation/client/SparkOnceJobTest.java | 127 +++++
.../engineconn/common/conf/EngineConnConf.scala | 5 +
.../engineconn/common/hook/EngineConnHook.scala | 36 +-
.../engineconn/launch/EngineConnServer.scala | 32 +-
.../common/launch/EngineConnLaunchBuilder.scala | 7 +-
.../process/ProcessEngineConnLaunchBuilder.scala | 4 +-
.../service/DefaultEngineConnLaunchService.scala | 24 +-
.../label/entity/engine/EngineConnModeLabel.scala | 5 +
.../linkis/manager/label/utils/LabelUtil.scala | 15 +
.../openlookeng/OpenLooKengECPlugin.scala | 10 +-
.../seatunnel/SeatunnelEngineConnPlugin.scala | 7 +-
linkis-engineconn-plugins/spark/pom.xml | 12 +
.../spark/client/context/ExecutionContext.java | 17 +-
.../spark/client/context/SparkConfig.java | 370 +++++++++++++++
.../deployment/ClusterDescriptorAdapter.java | 98 ++++
.../ClusterDescriptorAdapterFactory.java | 16 +-
.../YarnApplicationClusterDescriptorAdapter.java | 122 +++++
.../spark/errorcode/SparkErrorCodeSummary.java | 7 +
.../spark/exception/ExecutorInitException.java | 15 +-
.../spark/exception/JobExecutionException.java | 15 +-
.../main/resources/linkis-engineconn.properties | 2 +
.../engineplugin/spark/SparkEngineConnPlugin.scala | 6 +-
.../spark/config/SparkConfiguration.scala | 18 +-
.../spark/context/EnvironmentContext.scala | 65 +++
.../spark/context/SparkEngineConnContext.scala | 18 +-
.../spark/executor/SparkExecutor.scala | 64 +++
.../spark/executor/SparkOnceExecutor.scala | 158 +++++++
.../spark/executor/SparkSubmitOnceExecutor.scala | 145 ++++++
.../spark/factory/SparkEngineConnFactory.scala | 79 +++-
.../spark/factory/SparkOnceExecutorFactory.scala | 44 ++
.../launch/SparkEngineConnLaunchBuilder.scala | 171 +++++++
...SparkSubmitProcessEngineConnLaunchBuilder.scala | 522 +++++----------------
.../spark/utils/SparkJobProgressUtil.scala | 116 +++++
...SparkSubmitProcessEngineConnLaunchBuilder.scala | 10 +-
.../sqoop/SqoopEngineConnPlugin.scala | 7 +-
36 files changed, 1839 insertions(+), 532 deletions(-)
diff --git
a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/action/CreateEngineConnAction.scala
b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/action/CreateEngineConnAction.scala
index 403644e4d..f5210a1b8 100644
---
a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/action/CreateEngineConnAction.scala
+++
b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/action/CreateEngineConnAction.scala
@@ -89,7 +89,7 @@ object CreateEngineConnAction {
action.addRequestPayload("properties", properties)
action.addRequestPayload("labels", labels)
action.addRequestPayload("createService", createService)
- action.addRequestPayload("timeOut", maxSubmitTime)
+ action.addRequestPayload("timeout", maxSubmitTime)
action.addRequestPayload("description", description)
action.addRequestPayload("ignoreTimeout", ignoreTimeout)
action
diff --git
a/linkis-computation-governance/linkis-client/linkis-computation-client/src/test/java/org/apache/linkis/computation/client/SparkOnceJobTest.java
b/linkis-computation-governance/linkis-client/linkis-computation-client/src/test/java/org/apache/linkis/computation/client/SparkOnceJobTest.java
new file mode 100644
index 000000000..963c8569f
--- /dev/null
+++
b/linkis-computation-governance/linkis-client/linkis-computation-client/src/test/java/org/apache/linkis/computation/client/SparkOnceJobTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.computation.client;
+
+import
org.apache.linkis.computation.client.once.simple.SubmittableSimpleOnceJob;
+import org.apache.linkis.computation.client.operator.impl.*;
+
+import java.util.ArrayList;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SparkOnceJobTest {
+
+ private static final Logger logger =
LoggerFactory.getLogger(SparkOnceJobTest.class);
+
+ public static void main(String[] args) throws InterruptedException {
+
+ LinkisJobClient.config().setDefaultServerUrl("http://127.0.0.1:9001");
+
+ String submitUser = "linkis";
+ String engineType = "spark";
+
+ SubmittableSimpleOnceJob onceJob =
+ // region
+ LinkisJobClient.once()
+ .simple()
+ .builder()
+ .setCreateService("Spark-Test")
+ .setMaxSubmitTime(300000)
+ .setDescription("SparkTestDescription")
+ .addExecuteUser(submitUser)
+ .addJobContent("runType", "jar")
+ .addJobContent("spark.app.main.class",
"org.apache.spark.examples.JavaWordCount")
+ .addJobContent("spark.app.args", "hdfs:///tmp/log.log -a 10 -b=12")
+ .addJobContent(
+ "spark.extconf",
"spark.a=d\nspark.c=d\nspark.args.start_date=2022-06-14")
+ .addLabel("engineType", engineType + "-2.4.7")
+ .addLabel("userCreator", submitUser + "-IDE")
+ .addLabel("engineConnMode", "once")
+ .addStartupParam("spark.app.name", "spark-submit-jar-test-xi")
+ .addStartupParam("spark.executor.memory", "1g")
+ .addStartupParam("spark.driver.memory", "1g")
+ .addStartupParam("spark.executor.cores", "1")
+ .addStartupParam("spark.executor.instance", "1")
+ .addStartupParam(
+ "spark.app.resource",
"hdfs:///spark/spark-examples_2.11-2.3.0.2.6.5.0-292.jar")
+ .addSource("jobName", "OnceJobTest")
+ .build();
+ // endregion
+ onceJob.submit();
+ logger.info("jobId: {}", onceJob.getId());
+
+ EngineConnLogOperator logOperator =
+ (EngineConnLogOperator)
onceJob.getOperator(EngineConnLogOperator.OPERATOR_NAME());
+ int fromLine = 1;
+ int logSize = 1000; // log lines
+ logOperator.setPageSize(logSize);
+ logOperator.setEngineConnType("spark");
+
+ ArrayList<String> logLines;
+
+ // print log
+ while (true) {
+ try {
+ logOperator.setFromLine(fromLine);
+ EngineConnLogs logs = (EngineConnLogs) logOperator.apply();
+ logLines = logs.logs();
+ if (logLines == null || logLines.isEmpty()) {
+ if (!isCompleted(onceJob, 3)) {
+ Thread.sleep(2000);
+ continue;
+ } else {
+ break;
+ }
+ }
+ for (String log : logLines) {
+ System.out.println(log);
+ }
+ fromLine += logLines.size();
+ } catch (Exception e) {
+ logger.error("Failed to get log information", e);
+ break;
+ }
+ }
+
+ boolean complete = false;
+ // wait complete
+ while (!complete) {
+ try {
+ complete = onceJob.isCompleted();
+ } catch (Exception e) {
+ logger.error("isCompleted error", e);
+ }
+ }
+
+ String finalStatus = onceJob.getStatus();
+ logger.info("final status " + finalStatus);
+ }
+
+ static boolean isCompleted(SubmittableSimpleOnceJob onceJob, int times)
+ throws InterruptedException {
+ if (times == 0) return false;
+ try {
+ return onceJob.isCompleted();
+ } catch (Exception e) {
+ logger.error("isCompleted error", e);
+ Thread.sleep(2000);
+ return isCompleted(onceJob, times - 1);
+ }
+ }
+}
diff --git
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-common/src/main/scala/org/apache/linkis/engineconn/common/conf/EngineConnConf.scala
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-common/src/main/scala/org/apache/linkis/engineconn/common/conf/EngineConnConf.scala
index cbaa23376..1518f00a6 100644
---
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-common/src/main/scala/org/apache/linkis/engineconn/common/conf/EngineConnConf.scala
+++
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-common/src/main/scala/org/apache/linkis/engineconn/common/conf/EngineConnConf.scala
@@ -35,6 +35,11 @@ object EngineConnConf {
"org.apache.linkis.engineconn.computation.executor.hook.ComputationEngineConnHook"
)
+ val ENGINE_CONN_ONCE_HOOKS = CommonVars(
+ "linkis.engine.connector.once.hooks",
+
"org.apache.linkis.engineconn.computation.executor.hook.ComputationEngineConnHook"
+ )
+
val ENGINE_LAUNCH_CMD_PARAMS_USER_KEY =
CommonVars("wds.linkis.engine.launch.cmd.params.user.key", "user")
diff --git
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-common/src/main/scala/org/apache/linkis/engineconn/common/hook/EngineConnHook.scala
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-common/src/main/scala/org/apache/linkis/engineconn/common/hook/EngineConnHook.scala
index 5a73678a1..f60bf6d65 100644
---
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-common/src/main/scala/org/apache/linkis/engineconn/common/hook/EngineConnHook.scala
+++
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-common/src/main/scala/org/apache/linkis/engineconn/common/hook/EngineConnHook.scala
@@ -52,27 +52,33 @@ trait EngineConnHook {
object EngineConnHook extends Logging {
- private val engineConnHooks = initEngineConnHooks
+ private var engineConnHooks: Array[EngineConnHook] = _
- private def initEngineConnHooks: Array[EngineConnHook] = {
+ private def initEngineConnHooks(isOnceMode: Boolean): Unit = {
- val hooks = EngineConnConf.ENGINE_CONN_HOOKS.getValue
+ val hooks = if (isOnceMode) {
+ EngineConnConf.ENGINE_CONN_ONCE_HOOKS.getValue
+ } else {
+ EngineConnConf.ENGINE_CONN_HOOKS.getValue
+ }
if (StringUtils.isNotBlank(hooks)) {
- val clazzArr = hooks.split(",")
- if (null != clazzArr && clazzArr.nonEmpty) {
- clazzArr
- .map { clazz =>
- Utils.tryAndWarn(Utils.getClassInstance[EngineConnHook](clazz))
- }
- .filter(_ != null)
- } else {
- Array.empty
- }
+ engineConnHooks = hooks
+ .split(",")
+ .map(_.trim)
+ .filter(StringUtils.isNotBlank)
+ .map(Utils.tryAndWarn(Utils.getClassInstance[EngineConnHook](_)))
+ .filter(_ != null)
} else {
- Array.empty
+ engineConnHooks = Array.empty
}
}
- def getEngineConnHooks: Array[EngineConnHook] = engineConnHooks
+ def getEngineConnHooks(isOnceMode: Boolean = false): Array[EngineConnHook] =
{
+ if (engineConnHooks == null) {
+ initEngineConnHooks(isOnceMode)
+ }
+ engineConnHooks
+ }
+
}
diff --git
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-core/src/main/scala/org/apache/linkis/engineconn/launch/EngineConnServer.scala
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-core/src/main/scala/org/apache/linkis/engineconn/launch/EngineConnServer.scala
index 88f9112b3..92bd3cd75 100644
---
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-core/src/main/scala/org/apache/linkis/engineconn/launch/EngineConnServer.scala
+++
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-core/src/main/scala/org/apache/linkis/engineconn/launch/EngineConnServer.scala
@@ -46,6 +46,7 @@ import org.apache.linkis.manager.label.builder.factory.{
LabelBuilderFactoryContext
}
import org.apache.linkis.manager.label.entity.Label
+import org.apache.linkis.manager.label.entity.engine.EngineConnMode
import org.apache.commons.lang3.exception.ExceptionUtils
@@ -59,6 +60,8 @@ object EngineConnServer extends Logging {
private val labelBuilderFactory: LabelBuilderFactory =
LabelBuilderFactoryContext.getLabelBuilderFactory
+ private var onceMode: Boolean = false
+
def main(args: Array[String]): Unit = {
logger.info("<<---------------------EngineConnServer Start
--------------------->>")
@@ -76,14 +79,13 @@ object EngineConnServer extends Logging {
"Finished to create EngineCreationContext, EngineCreationContext
content: " + EngineConnUtils.GSON
.toJson(engineCreationContext)
)
-
EngineConnHook.getEngineConnHooks.foreach(_.beforeCreateEngineConn(getEngineCreationContext))
+ val ecHooks = EngineConnHook.getEngineConnHooks(onceMode)
+ ecHooks.foreach(_.beforeCreateEngineConn(getEngineCreationContext))
logger.info("Finished to execute hook of beforeCreateEngineConn.")
// 2. cresate EngineConn
val engineConn =
getEngineConnManager.createEngineConn(getEngineCreationContext)
logger.info(s"Finished to create ${engineConn.getEngineConnType}
EngineConn.")
- EngineConnHook.getEngineConnHooks.foreach(
- _.beforeExecutionExecute(getEngineCreationContext, engineConn)
- )
+ ecHooks.foreach(_.beforeExecutionExecute(getEngineCreationContext,
engineConn))
logger.info("Finished to execute all hooks of beforeExecutionExecute.")
// 3. register executions
Utils.tryThrow(executeEngineConn(engineConn)) { t =>
@@ -96,19 +98,14 @@ object EngineConnServer extends Logging {
}
EngineConnObject.setReady()
logger.info("Finished to execute executions.")
- EngineConnHook.getEngineConnHooks.foreach(
- _.afterExecutionExecute(getEngineCreationContext, engineConn)
- )
+ ecHooks.foreach(_.afterExecutionExecute(getEngineCreationContext,
engineConn))
logger.info("Finished to execute hook of afterExecutionExecute")
- EngineConnHook.getEngineConnHooks.foreach(
- _.afterEngineServerStartSuccess(getEngineCreationContext, engineConn)
- )
+
ecHooks.foreach(_.afterEngineServerStartSuccess(getEngineCreationContext,
engineConn))
} catch {
case t: Throwable =>
logger.error("EngineConnServer Start Failed.", t)
- EngineConnHook.getEngineConnHooks.foreach(
- _.afterEngineServerStartFailed(getEngineCreationContext, t)
- )
+ val ecHooks = EngineConnHook.getEngineConnHooks(onceMode)
+
ecHooks.foreach(_.afterEngineServerStartFailed(getEngineCreationContext, t))
System.exit(1)
}
@@ -132,8 +129,11 @@ object EngineConnServer extends Logging {
val labelArgs =
engineConf.filter(_._1.startsWith(EngineConnArgumentsParser.LABEL_PREFIX))
if (labelArgs.nonEmpty) {
labelArgs.foreach { case (key, value) =>
- labels += labelBuilderFactory
-
.createLabel[Label[_]](key.replace(EngineConnArgumentsParser.LABEL_PREFIX, ""),
value)
+ val realKey =
key.substring(EngineConnArgumentsParser.LABEL_PREFIX.length)
+ if ("engineConnMode".equals(realKey)) {
+ onceMode = EngineConnMode.isOnceMode(value)
+ }
+ labels += labelBuilderFactory.createLabel[Label[_]](realKey, value)
}
engineCreationContext.setLabels(labels.toList.asJava)
}
@@ -160,6 +160,8 @@ object EngineConnServer extends Logging {
}
}
+ def isOnceMode: Boolean = this.onceMode
+
def getEngineCreationContext: EngineCreationContext =
this.engineCreationContext
private def getEngineConnManager: EngineConnManager =
EngineConnManager.getEngineConnManager
diff --git
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/launch/EngineConnLaunchBuilder.scala
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/launch/EngineConnLaunchBuilder.scala
index c0e919863..11c445447 100644
---
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/launch/EngineConnLaunchBuilder.scala
+++
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/launch/EngineConnLaunchBuilder.scala
@@ -24,6 +24,11 @@ import
org.apache.linkis.manager.engineplugin.common.launch.entity.{
trait EngineConnLaunchBuilder {
- def buildEngineConn(engineConnBuildRequest: EngineConnBuildRequest):
EngineConnLaunchRequest
+ protected var engineConnBuildRequest: EngineConnBuildRequest = _
+
+ def setBuildRequest(engineConnBuildRequest: EngineConnBuildRequest): Unit =
+ this.engineConnBuildRequest = engineConnBuildRequest
+
+ def buildEngineConn(): EngineConnLaunchRequest
}
diff --git
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/launch/process/ProcessEngineConnLaunchBuilder.scala
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/launch/process/ProcessEngineConnLaunchBuilder.scala
index 1a5905ac1..ae35b18df 100644
---
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/launch/process/ProcessEngineConnLaunchBuilder.scala
+++
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/launch/process/ProcessEngineConnLaunchBuilder.scala
@@ -52,9 +52,7 @@ trait ProcessEngineConnLaunchBuilder extends
EngineConnLaunchBuilder {
engineConnBuildRequest: EngineConnBuildRequest
): util.List[String] = new util.ArrayList[String]
- override def buildEngineConn(
- engineConnBuildRequest: EngineConnBuildRequest
- ): EngineConnLaunchRequest = {
+ override def buildEngineConn(): EngineConnLaunchRequest = {
implicit val engineConnBuildRequestImplicit: EngineConnBuildRequest =
engineConnBuildRequest
val bmlResources = new util.ArrayList[BmlResource]
engineConnBuildRequest match {
diff --git
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/engineplugin/server/service/DefaultEngineConnLaunchService.scala
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/engineplugin/server/service/DefaultEngineConnLaunchService.scala
index 52d830d32..cb3b03fa5 100644
---
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/engineplugin/server/service/DefaultEngineConnLaunchService.scala
+++
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/engineplugin/server/service/DefaultEngineConnLaunchService.scala
@@ -46,7 +46,10 @@ class DefaultEngineConnLaunchService extends
EngineConnLaunchService with Loggin
@Autowired
private var engineConnResourceGenerator: EngineConnResourceGenerator = _
- private def getEngineLaunchBuilder(engineTypeLabel: EngineTypeLabel):
EngineConnLaunchBuilder = {
+ private def getEngineLaunchBuilder(
+ engineTypeLabel: EngineTypeLabel,
+ engineBuildRequest: EngineConnBuildRequest
+ ): EngineConnLaunchBuilder = {
val engineConnPluginInstance =
EngineConnPluginsLoader.getEngineConnPluginsLoader().getEngineConnPlugin(engineTypeLabel)
val builder = engineConnPluginInstance.plugin.getEngineConnLaunchBuilder
@@ -56,6 +59,7 @@ class DefaultEngineConnLaunchService extends
EngineConnLaunchService with Loggin
engineConnResourceGenerator
)
}
+ builder.setBuildRequest(engineBuildRequest)
builder
}
@@ -66,16 +70,14 @@ class DefaultEngineConnLaunchService extends
EngineConnLaunchService with Loggin
val engineTypeOption =
engineBuildRequest.labels.asScala.find(_.isInstanceOf[EngineTypeLabel])
if (engineTypeOption.isDefined) {
val engineTypeLabel = engineTypeOption.get.asInstanceOf[EngineTypeLabel]
-
Utils.tryCatch(getEngineLaunchBuilder(engineTypeLabel).buildEngineConn(engineBuildRequest))
{
- t =>
- logger.error(
- s"Failed to
createEngineConnLaunchRequest(${engineBuildRequest.ticketId})",
- t
- )
- throw new EngineConnPluginErrorException(
- FAILED_CREATE_ELR.getErrorCode,
- s"${FAILED_CREATE_ELR.getErrorDesc},
${ExceptionUtils.getRootCauseMessage(t)}"
- )
+ Utils.tryCatch(
+ getEngineLaunchBuilder(engineTypeLabel,
engineBuildRequest).buildEngineConn()
+ ) { t =>
+ logger.error(s"Failed to
createEngineConnLaunchRequest(${engineBuildRequest.ticketId})", t)
+ throw new EngineConnPluginErrorException(
+ FAILED_CREATE_ELR.getErrorCode,
+ s"${FAILED_CREATE_ELR.getErrorDesc},
${ExceptionUtils.getRootCauseMessage(t)}"
+ )
}
} else {
throw new EngineConnPluginErrorException(
diff --git
a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/EngineConnModeLabel.scala
b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/EngineConnModeLabel.scala
index 7c53cb6f0..b1880bc8e 100644
---
a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/EngineConnModeLabel.scala
+++
b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/EngineConnModeLabel.scala
@@ -60,4 +60,9 @@ object EngineConnMode extends Enumeration {
case _ => Unknown
}
+ val ONCE_MODES = Set(Once, Computation_With_Once, Once_With_Cluster)
+
+ def isOnceMode(ecMode: String): Boolean = {
+ ONCE_MODES.contains(toEngineConnMode(ecMode))
+ }
}
\ No newline at end of file
diff --git
a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/scala/org/apache/linkis/manager/label/utils/LabelUtil.scala
b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/scala/org/apache/linkis/manager/label/utils/LabelUtil.scala
index 7a3627d3c..b4a66d2f4 100644
---
a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/scala/org/apache/linkis/manager/label/utils/LabelUtil.scala
+++
b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/scala/org/apache/linkis/manager/label/utils/LabelUtil.scala
@@ -20,6 +20,7 @@ package org.apache.linkis.manager.label.utils
import org.apache.linkis.manager.label.entity.Label
import org.apache.linkis.manager.label.entity.engine.{
CodeLanguageLabel,
+ EngineConnModeLabel,
EngineTypeLabel,
UserCreatorLabel
}
@@ -79,6 +80,20 @@ object LabelUtil {
getLabelFromList[CodeLanguageLabel](labels)
}
+ def getEngineConnModeLabel(labels: util.List[Label[_]]): EngineConnModeLabel
= {
+ getLabelFromList[EngineConnModeLabel](labels)
+ }
+
+ def getEngineConnMode(labels: util.List[Label[_]]): String = {
+ if (null == labels) return null
+ val engineConnModeLabel = getEngineConnModeLabel(labels)
+ if (null != engineConnModeLabel) {
+ engineConnModeLabel.getEngineConnMode
+ } else {
+ null
+ }
+ }
+
def getBindEngineLabel(labels: util.List[Label[_]]): BindEngineLabel = {
getLabelFromList[BindEngineLabel](labels)
}
diff --git
a/linkis-engineconn-plugins/openlookeng/src/main/scala/org/apache/linkis/engineplugin/openlookeng/OpenLooKengECPlugin.scala
b/linkis-engineconn-plugins/openlookeng/src/main/scala/org/apache/linkis/engineplugin/openlookeng/OpenLooKengECPlugin.scala
index 48e1b2b9a..da827e8b8 100644
---
a/linkis-engineconn-plugins/openlookeng/src/main/scala/org/apache/linkis/engineplugin/openlookeng/OpenLooKengECPlugin.scala
+++
b/linkis-engineconn-plugins/openlookeng/src/main/scala/org/apache/linkis/engineplugin/openlookeng/OpenLooKengECPlugin.scala
@@ -55,25 +55,25 @@ class OpenLooKengECPlugin extends EngineConnPlugin {
this.defaultLabels.add(engineTypeLabel)
}
- override def getEngineResourceFactory(): EngineResourceFactory = {
+ override def getEngineResourceFactory: EngineResourceFactory = {
if (null == engineResourceFactory) resourceLocker synchronized {
engineResourceFactory = new GenericEngineResourceFactory
}
engineResourceFactory
}
- override def getEngineConnLaunchBuilder(): EngineConnLaunchBuilder = {
- new OpenLooKengProcessECLaunchBuilder;
+ override def getEngineConnLaunchBuilder: EngineConnLaunchBuilder = {
+ new OpenLooKengProcessECLaunchBuilder
}
- override def getEngineConnFactory(): EngineConnFactory = {
+ override def getEngineConnFactory: EngineConnFactory = {
if (null == engineFactory) engineFactoryLocker synchronized {
engineFactory = new OpenLooKengEngineConnFactory
}
engineFactory
}
- override def getDefaultLabels(): util.List[Label[_]] = {
+ override def getDefaultLabels: util.List[Label[_]] = {
this.defaultLabels
}
diff --git
a/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/SeatunnelEngineConnPlugin.scala
b/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/SeatunnelEngineConnPlugin.scala
index 21ddcffb5..3736f0ef0 100644
---
a/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/SeatunnelEngineConnPlugin.scala
+++
b/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/SeatunnelEngineConnPlugin.scala
@@ -51,12 +51,7 @@ class SeatunnelEngineConnPlugin extends EngineConnPlugin {
}
override def getEngineConnLaunchBuilder: EngineConnLaunchBuilder = {
- engineLaunchBuilderLocker.synchronized {
- if (null == engineConnLaunchBuilder) {
- engineConnLaunchBuilder = new SeatunnelEngineConnLaunchBuilder()
- }
- engineConnLaunchBuilder
- }
+ new SeatunnelEngineConnLaunchBuilder()
}
override def getEngineConnFactory: EngineConnFactory = {
diff --git a/linkis-engineconn-plugins/spark/pom.xml
b/linkis-engineconn-plugins/spark/pom.xml
index 6966441de..2bb404ed8 100644
--- a/linkis-engineconn-plugins/spark/pom.xml
+++ b/linkis-engineconn-plugins/spark/pom.xml
@@ -63,6 +63,18 @@
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.apache.linkis</groupId>
+ <artifactId>linkis-once-engineconn</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
<dependency>
<groupId>org.apache.linkis</groupId>
<artifactId>linkis-storage</artifactId>
diff --git
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/launch/EngineConnLaunchBuilder.scala
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/context/ExecutionContext.java
similarity index 72%
copy from
linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/launch/EngineConnLaunchBuilder.scala
copy to
linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/context/ExecutionContext.java
index c0e919863..d6d62781c 100644
---
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/launch/EngineConnLaunchBuilder.scala
+++
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/context/ExecutionContext.java
@@ -15,15 +15,16 @@
* limitations under the License.
*/
-package org.apache.linkis.manager.engineplugin.common.launch
+package org.apache.linkis.engineplugin.spark.client.context;
-import org.apache.linkis.manager.engineplugin.common.launch.entity.{
- EngineConnBuildRequest,
- EngineConnLaunchRequest
-}
-
-trait EngineConnLaunchBuilder {
+public class ExecutionContext {
+ private SparkConfig sparkConfig;
- def buildEngineConn(engineConnBuildRequest: EngineConnBuildRequest):
EngineConnLaunchRequest
+ public SparkConfig getSparkConfig() {
+ return sparkConfig;
+ }
+ public void setSparkConfig(SparkConfig sparkConfig) {
+ this.sparkConfig = sparkConfig;
+ }
}
diff --git
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/context/SparkConfig.java
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/context/SparkConfig.java
new file mode 100644
index 000000000..c8f784222
--- /dev/null
+++
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/context/SparkConfig.java
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.client.context;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class SparkConfig {
+
+ private String javaHome; // ("")
+ private String sparkHome; // ("")
+ private String master = "yarn"; // ("yarn")
+ private String deployMode = "client"; // ("client") // todo cluster
+ private String appResource; // ("")
+ // private String mainClass; // ("")
+ private String appName; // ("")
+ private String jars; // ("--jars", "")
+ private String packages; // ("--packages", "")
+ private String excludePackages; // ("--exclude-packages", "")
+ private String repositories; // ("--repositories", "")
+ private String files; // ("--files", "")
+ private String archives; // ("--archives", "")
+ private Map<String, String> conf = new HashMap<>(); // ("", "")
+ private String propertiesFile; // ("")
+ private String driverMemory; // ("--driver-memory", "")
+ private String driverJavaOptions; // ("--driver-java-options", "")
+ private String driverLibraryPath; // ("--driver-library-path", "")
+ private String driverClassPath; // ("--driver-class-path", "")
+ private String executorMemory; // ("--executor-memory", "")
+ private String proxyUser; // ("--proxy-user", "")
+ private boolean verbose = false; // (false)
+ private Integer driverCores; // ("--driver-cores", "") // Cluster deploy
mode only
+ private String totalExecutorCores; // ("--total-executor-cores", "")
+ private Integer executorCores; // ("--executor-cores", "")
+ private Integer numExecutors; // ("--num-executors", "")
+ private String principal; // ("--principal", "")
+ private String keytab; // ("--keytab", "")
+ private String queue; // ("--queue", "")
+
+ public String getJavaHome() {
+ return javaHome;
+ }
+
+ public void setJavaHome(String javaHome) {
+ this.javaHome = javaHome;
+ }
+
+ public String getSparkHome() {
+ return sparkHome;
+ }
+
+ public void setSparkHome(String sparkHome) {
+ this.sparkHome = sparkHome;
+ }
+
+ public String getMaster() {
+ return master;
+ }
+
+ public void setMaster(String master) {
+ this.master = master;
+ }
+
+ public String getDeployMode() {
+ return deployMode;
+ }
+
+ public void setDeployMode(String deployMode) {
+ this.deployMode = deployMode;
+ }
+
+ public String getAppResource() {
+ return appResource;
+ }
+
+ public void setAppResource(String appResource) {
+ this.appResource = appResource;
+ }
+
+ // public String getMainClass() {
+ // return mainClass;
+ // }
+ //
+ // public void setMainClass(String mainClass) {
+ // this.mainClass = mainClass;
+ // }
+
+ public String getAppName() {
+ return appName;
+ }
+
+ public void setAppName(String appName) {
+ this.appName = appName;
+ }
+
+ public String getJars() {
+ return jars;
+ }
+
+ public void setJars(String jars) {
+ this.jars = jars;
+ }
+
+ public String getPackages() {
+ return packages;
+ }
+
+ public void setPackages(String packages) {
+ this.packages = packages;
+ }
+
+ public String getExcludePackages() {
+ return excludePackages;
+ }
+
+ public void setExcludePackages(String excludePackages) {
+ this.excludePackages = excludePackages;
+ }
+
+ public String getRepositories() {
+ return repositories;
+ }
+
+ public void setRepositories(String repositories) {
+ this.repositories = repositories;
+ }
+
+ public String getFiles() {
+ return files;
+ }
+
+ public void setFiles(String files) {
+ this.files = files;
+ }
+
+ public String getArchives() {
+ return archives;
+ }
+
+ public void setArchives(String archives) {
+ this.archives = archives;
+ }
+
+ public Map<String, String> getConf() {
+ return conf;
+ }
+
+ public void setConf(Map<String, String> conf) {
+ this.conf = conf;
+ }
+
+ public String getPropertiesFile() {
+ return propertiesFile;
+ }
+
+ public void setPropertiesFile(String propertiesFile) {
+ this.propertiesFile = propertiesFile;
+ }
+
+ public String getDriverMemory() {
+ return driverMemory;
+ }
+
+ public void setDriverMemory(String driverMemory) {
+ this.driverMemory = driverMemory;
+ }
+
+ public String getDriverJavaOptions() {
+ return driverJavaOptions;
+ }
+
+ public void setDriverJavaOptions(String driverJavaOptions) {
+ this.driverJavaOptions = driverJavaOptions;
+ }
+
+ public String getDriverLibraryPath() {
+ return driverLibraryPath;
+ }
+
+ public void setDriverLibraryPath(String driverLibraryPath) {
+ this.driverLibraryPath = driverLibraryPath;
+ }
+
+ public String getDriverClassPath() {
+ return driverClassPath;
+ }
+
+ public void setDriverClassPath(String driverClassPath) {
+ this.driverClassPath = driverClassPath;
+ }
+
+ public String getExecutorMemory() {
+ return executorMemory;
+ }
+
+ public void setExecutorMemory(String executorMemory) {
+ this.executorMemory = executorMemory;
+ }
+
+ public String getProxyUser() {
+ return proxyUser;
+ }
+
+ public void setProxyUser(String proxyUser) {
+ this.proxyUser = proxyUser;
+ }
+
+ public boolean isVerbose() {
+ return verbose;
+ }
+
+ public void setVerbose(boolean verbose) {
+ this.verbose = verbose;
+ }
+
+ public Integer getDriverCores() {
+ return driverCores;
+ }
+
+ public void setDriverCores(Integer driverCores) {
+ this.driverCores = driverCores;
+ }
+
+ public String getTotalExecutorCores() {
+ return totalExecutorCores;
+ }
+
+ public void setTotalExecutorCores(String totalExecutorCores) {
+ this.totalExecutorCores = totalExecutorCores;
+ }
+
+ public Integer getExecutorCores() {
+ return executorCores;
+ }
+
+ public void setExecutorCores(Integer executorCores) {
+ this.executorCores = executorCores;
+ }
+
+ public Integer getNumExecutors() {
+ return numExecutors;
+ }
+
+ public void setNumExecutors(Integer numExecutors) {
+ this.numExecutors = numExecutors;
+ }
+
+ public String getPrincipal() {
+ return principal;
+ }
+
+ public void setPrincipal(String principal) {
+ this.principal = principal;
+ }
+
+ public String getKeytab() {
+ return keytab;
+ }
+
+ public void setKeytab(String keytab) {
+ this.keytab = keytab;
+ }
+
+ public String getQueue() {
+ return queue;
+ }
+
+ public void setQueue(String queue) {
+ this.queue = queue;
+ }
+
+ @Override
+ public String toString() {
+ return "SparkConfig{"
+ + "javaHome='"
+ + javaHome
+ + '\''
+ + ", sparkHome='"
+ + sparkHome
+ + '\''
+ + ", master='"
+ + master
+ + '\''
+ + ", deployMode='"
+ + deployMode
+ + '\''
+ +
+ // ", mainClass='" + mainClass + '\'' +
+ ", appName='"
+ + appName
+ + '\''
+ + ", jars='"
+ + jars
+ + '\''
+ + ", packages='"
+ + packages
+ + '\''
+ + ", excludePackages='"
+ + excludePackages
+ + '\''
+ + ", repositories='"
+ + repositories
+ + '\''
+ + ", files='"
+ + files
+ + '\''
+ + ", archives='"
+ + archives
+ + '\''
+ + ", conf="
+ + conf
+ + ", propertiesFile='"
+ + propertiesFile
+ + '\''
+ + ", driverMemory='"
+ + driverMemory
+ + '\''
+ + ", driverJavaOptions='"
+ + driverJavaOptions
+ + '\''
+ + ", driverLibraryPath='"
+ + driverLibraryPath
+ + '\''
+ + ", driverClassPath='"
+ + driverClassPath
+ + '\''
+ + ", executorMemory='"
+ + executorMemory
+ + '\''
+ + ", proxyUser='"
+ + proxyUser
+ + '\''
+ + ", verbose="
+ + verbose
+ + ", driverCores="
+ + driverCores
+ + ", totalExecutorCores='"
+ + totalExecutorCores
+ + '\''
+ + ", executorCores="
+ + executorCores
+ + ", numExecutors="
+ + numExecutors
+ + ", principal='"
+ + principal
+ + '\''
+ + ", keytab='"
+ + keytab
+ + '\''
+ + ", queue='"
+ + queue
+ + '\''
+ + '}';
+ }
+}
diff --git
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapter.java
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapter.java
new file mode 100644
index 000000000..4607f2e4b
--- /dev/null
+++
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapter.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.client.deployment;
+
+import org.apache.linkis.engineplugin.spark.client.context.ExecutionContext;
+
+import org.apache.spark.launcher.SparkAppHandle;
+import org.apache.spark.launcher.SparkLauncher;
+
+import java.io.Closeable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class ClusterDescriptorAdapter implements Closeable {
+ protected static final Logger logger =
LoggerFactory.getLogger(ClusterDescriptorAdapter.class);
+
+ protected final ExecutionContext executionContext;
+ protected String applicationId;
+ protected SparkLauncher sparkLauncher;
+ protected SparkAppHandle sparkAppHandle;
+ protected SparkAppHandle.State jobState;
+
+ public String getApplicationId() {
+ return applicationId;
+ }
+
+ public ClusterDescriptorAdapter(ExecutionContext executionContext) {
+ this.executionContext = executionContext;
+ }
+
+ /** Returns the state of the spark job. */
+ public SparkAppHandle.State getJobState() {
+ return jobState;
+ }
+
+ /** Cancel the spark job. */
+ public void cancelJob() {
+ if (sparkAppHandle != null) {
+ logger.info("Start to cancel job {}.", sparkAppHandle.getAppId());
+ this.stopJob();
+ } else {
+ logger.warn("Cancel job: sparkAppHandle is null");
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "ClusterDescriptorAdapter{" + "applicationId=" +
sparkAppHandle.getAppId() + '}';
+ }
+
+ @Override
+ public void close() {
+ if (sparkAppHandle != null) {
+ logger.info("Start to close job {}.", sparkAppHandle.getAppId());
+ this.stopJob();
+ } else {
+ logger.warn("Close job: sparkAppHandle is null");
+ }
+ }
+
+ private void stopJob() {
+ if (sparkAppHandle == null) {
+ return;
+ }
+ if (sparkAppHandle.getState().isFinal()) {
+ logger.info("Job has finished, stop action return.");
+ return;
+ }
+ try {
+ logger.info("Try to stop job {}.", sparkAppHandle.getAppId());
+ sparkAppHandle.stop();
+ } catch (Exception e) {
+ logger.error("Stop job failed.", e);
+ try {
+ logger.info("Try to kill job {}.", sparkAppHandle.getAppId());
+ sparkAppHandle.kill();
+ } catch (Exception ex) {
+ logger.error("Kill job failed.", ex);
+ }
+ }
+ }
+}
diff --git
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/launch/EngineConnLaunchBuilder.scala
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapterFactory.java
similarity index 64%
copy from
linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/launch/EngineConnLaunchBuilder.scala
copy to
linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapterFactory.java
index c0e919863..8e9540dcb 100644
---
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/launch/EngineConnLaunchBuilder.scala
+++
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapterFactory.java
@@ -15,15 +15,15 @@
* limitations under the License.
*/
-package org.apache.linkis.manager.engineplugin.common.launch
+package org.apache.linkis.engineplugin.spark.client.deployment;
-import org.apache.linkis.manager.engineplugin.common.launch.entity.{
- EngineConnBuildRequest,
- EngineConnLaunchRequest
-}
-
-trait EngineConnLaunchBuilder {
+import org.apache.linkis.engineplugin.spark.client.context.ExecutionContext;
- def buildEngineConn(engineConnBuildRequest: EngineConnBuildRequest):
EngineConnLaunchRequest
+public class ClusterDescriptorAdapterFactory {
+ public static ClusterDescriptorAdapter create(ExecutionContext
executionContext) {
+ ClusterDescriptorAdapter clusterDescriptorAdapter =
+ new YarnApplicationClusterDescriptorAdapter(executionContext);
+ return clusterDescriptorAdapter;
+ }
}
diff --git
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/YarnApplicationClusterDescriptorAdapter.java
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/YarnApplicationClusterDescriptorAdapter.java
new file mode 100644
index 000000000..1cdfb374e
--- /dev/null
+++
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/YarnApplicationClusterDescriptorAdapter.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.client.deployment;
+
+import org.apache.linkis.engineplugin.spark.client.context.ExecutionContext;
+import org.apache.linkis.engineplugin.spark.client.context.SparkConfig;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.spark.launcher.SparkAppHandle;
+import org.apache.spark.launcher.SparkLauncher;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+public class YarnApplicationClusterDescriptorAdapter extends
ClusterDescriptorAdapter {
+
+ public YarnApplicationClusterDescriptorAdapter(ExecutionContext
executionContext) {
+ super(executionContext);
+ }
+
+ public void deployCluster(String mainClass, String args, Map<String, String>
confMap)
+ throws IOException, InterruptedException {
+ SparkConfig sparkConfig = executionContext.getSparkConfig();
+
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+ SparkAppHandle.Listener listener =
+ new SparkAppHandle.Listener() {
+ @Override
+ public void stateChanged(SparkAppHandle sparkAppHandle) {
+ jobState = sparkAppHandle.getState();
+ if (sparkAppHandle.getAppId() != null) {
+ countDownLatch.countDown();
+ applicationId = sparkAppHandle.getAppId();
+ logger.info("{} stateChanged: {}", applicationId,
jobState.toString());
+ } else {
+ if (jobState.isFinal()) {
+ countDownLatch.countDown();
+ }
+ logger.info("stateChanged: {}", jobState.toString());
+ }
+ }
+
+ @Override
+ public void infoChanged(SparkAppHandle sparkAppHandle) {
+ jobState = sparkAppHandle.getState();
+ if (sparkAppHandle.getAppId() != null) {
+ logger.info("{} infoChanged: {}", sparkAppHandle.getAppId(),
jobState.toString());
+ } else {
+ logger.info("infoChanged: {}", jobState.toString());
+ }
+ }
+ };
+
+ sparkLauncher = new SparkLauncher();
+ // region set args
+ sparkLauncher
+ .setJavaHome(sparkConfig.getJavaHome())
+ .setSparkHome(sparkConfig.getSparkHome())
+ .setMaster(sparkConfig.getMaster())
+ .setDeployMode(sparkConfig.getDeployMode())
+ .setAppName(sparkConfig.getAppName())
+ // .setPropertiesFile("")
+ .setVerbose(true);
+ sparkLauncher.setConf("spark.app.name", sparkConfig.getAppName());
+ if (confMap != null) confMap.forEach((k, v) -> sparkLauncher.setConf(k,
v));
+ addSparkArg(sparkLauncher, "--jars", sparkConfig.getJars());
+ addSparkArg(sparkLauncher, "--packages", sparkConfig.getPackages());
+ addSparkArg(sparkLauncher, "--exclude-packages",
sparkConfig.getExcludePackages());
+ addSparkArg(sparkLauncher, "--repositories",
sparkConfig.getRepositories());
+ addSparkArg(sparkLauncher, "--files", sparkConfig.getFiles());
+ addSparkArg(sparkLauncher, "--archives", sparkConfig.getArchives());
+ addSparkArg(sparkLauncher, "--driver-memory",
sparkConfig.getDriverMemory());
+ addSparkArg(sparkLauncher, "--driver-java-options",
sparkConfig.getDriverJavaOptions());
+ addSparkArg(sparkLauncher, "--driver-library-path",
sparkConfig.getDriverLibraryPath());
+ addSparkArg(sparkLauncher, "--driver-class-path",
sparkConfig.getDriverClassPath());
+ addSparkArg(sparkLauncher, "--executor-memory",
sparkConfig.getExecutorMemory());
+ addSparkArg(sparkLauncher, "--proxy-user", sparkConfig.getProxyUser());
+ addSparkArg(sparkLauncher, "--driver-cores",
sparkConfig.getDriverCores().toString());
+ addSparkArg(sparkLauncher, "--total-executor-cores",
sparkConfig.getTotalExecutorCores());
+ addSparkArg(sparkLauncher, "--executor-cores",
sparkConfig.getExecutorCores().toString());
+ addSparkArg(sparkLauncher, "--num-executors",
sparkConfig.getNumExecutors().toString());
+ addSparkArg(sparkLauncher, "--principal", sparkConfig.getPrincipal());
+ addSparkArg(sparkLauncher, "--keytab", sparkConfig.getKeytab());
+ addSparkArg(sparkLauncher, "--queue", sparkConfig.getQueue());
+ sparkLauncher.setAppResource(sparkConfig.getAppResource());
+ sparkLauncher.setMainClass(mainClass);
+ Arrays.stream(args.split("\\s+"))
+ .filter(StringUtils::isNotBlank)
+ .forEach(arg -> sparkLauncher.addAppArgs(arg));
+ // sparkLauncher.addAppArgs(args);
+ // endregion
+ sparkAppHandle = sparkLauncher.startApplication(listener);
+ countDownLatch.await();
+ }
+
+ private void addSparkArg(SparkLauncher sparkLauncher, String key, String
value) {
+ if (StringUtils.isNotBlank(key) && StringUtils.isNotBlank(value)) {
+ sparkLauncher.addSparkArg(key, value);
+ }
+ }
+
+ public boolean initJobId() {
+ return null != getApplicationId();
+ }
+}
diff --git
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/errorcode/SparkErrorCodeSummary.java
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/errorcode/SparkErrorCodeSummary.java
index 084b7bcae..96c5f4dd3 100644
---
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/errorcode/SparkErrorCodeSummary.java
+++
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/errorcode/SparkErrorCodeSummary.java
@@ -55,6 +55,13 @@ public enum SparkErrorCodeSummary {
"{0} requires that the data to be inserted have the same number of
columns as the target table: target table has {1} column(s) but the inserted
data has {2} column(s)"),
DATA_CALC_FIELD_NOT_EXIST(43023, "{0} columns({1}) are not exist in source
columns"),
DATA_CALC_VARIABLE_NOT_EXIST(43024, "Please set [{0}] in variables"),
+
+ NOT_SUPPORT_ADAPTER(43031, "Not support Adapter for spark application."),
+
+ YARN_APPLICATION_START_FAILED(
+ 43032, "The application start failed, since yarn applicationId is
null."),
+
+ NOT_SUPPORT_METHOD(43040, "Not support method for requestExpectedResource."),
;
/** (errorCode)错误码 */
diff --git
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/launch/EngineConnLaunchBuilder.scala
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/exception/ExecutorInitException.java
similarity index 72%
copy from
linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/launch/EngineConnLaunchBuilder.scala
copy to
linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/exception/ExecutorInitException.java
index c0e919863..bc3180d54 100644
---
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/launch/EngineConnLaunchBuilder.scala
+++
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/exception/ExecutorInitException.java
@@ -15,15 +15,12 @@
* limitations under the License.
*/
-package org.apache.linkis.manager.engineplugin.common.launch
+package org.apache.linkis.engineplugin.spark.exception;
-import org.apache.linkis.manager.engineplugin.common.launch.entity.{
- EngineConnBuildRequest,
- EngineConnLaunchRequest
-}
-
-trait EngineConnLaunchBuilder {
-
- def buildEngineConn(engineConnBuildRequest: EngineConnBuildRequest):
EngineConnLaunchRequest
+import org.apache.linkis.common.exception.ErrorException;
+public class ExecutorInitException extends ErrorException {
+ public ExecutorInitException(int errCode, String desc) {
+ super(errCode, desc);
+ }
}
diff --git
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/launch/EngineConnLaunchBuilder.scala
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/exception/JobExecutionException.java
similarity index 72%
copy from
linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/launch/EngineConnLaunchBuilder.scala
copy to
linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/exception/JobExecutionException.java
index c0e919863..dfc70a283 100644
---
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/launch/EngineConnLaunchBuilder.scala
+++
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/exception/JobExecutionException.java
@@ -15,15 +15,12 @@
* limitations under the License.
*/
-package org.apache.linkis.manager.engineplugin.common.launch
+package org.apache.linkis.engineplugin.spark.exception;
-import org.apache.linkis.manager.engineplugin.common.launch.entity.{
- EngineConnBuildRequest,
- EngineConnLaunchRequest
-}
-
-trait EngineConnLaunchBuilder {
-
- def buildEngineConn(engineConnBuildRequest: EngineConnBuildRequest):
EngineConnLaunchRequest
+import org.apache.linkis.common.exception.ErrorException;
+public class JobExecutionException extends ErrorException {
+ public JobExecutionException(int errCode, String desc) {
+ super(errCode, desc);
+ }
}
diff --git
a/linkis-engineconn-plugins/spark/src/main/resources/linkis-engineconn.properties
b/linkis-engineconn-plugins/spark/src/main/resources/linkis-engineconn.properties
index 06052244f..3de8a6512 100644
---
a/linkis-engineconn-plugins/spark/src/main/resources/linkis-engineconn.properties
+++
b/linkis-engineconn-plugins/spark/src/main/resources/linkis-engineconn.properties
@@ -26,4 +26,6 @@
wds.linkis.engineconn.plugin.default.class=org.apache.linkis.engineplugin.spark.
wds.linkis.engine.connector.hooks=org.apache.linkis.engineconn.computation.executor.hook.ComputationEngineConnHook,org.apache.linkis.engineconn.computation.executor.hook.PyUdfEngineHook,org.apache.linkis.engineconn.computation.executor.hook.ScalaUdfEngineHook,org.apache.linkis.engineconn.computation.executor.hook.JarUdfEngineHook,org.apache.linkis.engineconn.computation.executor.hook.SparkInitSQLHook
+linkis.spark.once.yarn.restful.url=http://127.0.0.1:8088
+
wds.linkis.fs.hdfs.impl.disable.cache=true
\ No newline at end of file
diff --git
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/SparkEngineConnPlugin.scala
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/SparkEngineConnPlugin.scala
index 67cfb49ad..0ef8ba81b 100644
---
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/SparkEngineConnPlugin.scala
+++
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/SparkEngineConnPlugin.scala
@@ -21,13 +21,13 @@ import org.apache.linkis.engineplugin.spark.factory.{
SparkEngineConnFactory,
SparkEngineConnResourceFactory
}
-import
org.apache.linkis.engineplugin.spark.launch.SparkSubmitProcessEngineConnLaunchBuilder
+import org.apache.linkis.engineplugin.spark.launch.SparkEngineConnLaunchBuilder
import org.apache.linkis.manager.engineplugin.common.EngineConnPlugin
import org.apache.linkis.manager.engineplugin.common.creation.EngineConnFactory
import
org.apache.linkis.manager.engineplugin.common.launch.EngineConnLaunchBuilder
import
org.apache.linkis.manager.engineplugin.common.resource.EngineResourceFactory
import org.apache.linkis.manager.label.entity.Label
-import org.apache.linkis.manager.label.entity.engine.{EngineType,
EngineTypeLabel}
+import org.apache.linkis.manager.label.entity.engine.EngineType
import org.apache.linkis.manager.label.utils.EngineTypeLabelCreator
import java.util
@@ -61,7 +61,7 @@ class SparkEngineConnPlugin extends EngineConnPlugin {
}
override def getEngineConnLaunchBuilder: EngineConnLaunchBuilder = {
- SparkSubmitProcessEngineConnLaunchBuilder.newBuilder()
+ new SparkEngineConnLaunchBuilder()
}
override def getEngineConnFactory: EngineConnFactory = {
diff --git
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala
index 20cc9006a..c4370c7eb 100644
---
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala
+++
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala
@@ -27,6 +27,9 @@ import java.io.File
*/
object SparkConfiguration extends Logging {
+ val SPARK_HOME_ENV = "SPARK_HOME"
+ val SPARK_CONF_DIR_ENV = "SPARK_CONF_DIR"
+
val PROCESS_MAX_THREADS =
CommonVars[Int]("wds.linkis.process.threadpool.max", 100)
val SPARK_SESSION_HOOK =
CommonVars[String]("wds.linkis.engine.spark.session.hook", "")
@@ -39,11 +42,13 @@ object SparkConfiguration extends Logging {
val SPARK_REPL_CLASSDIR = CommonVars[String]("spark.repl.classdir", "",
"默认master")
- val PROXY_USER = CommonVars[String]("spark.proxy.user", "${UM}")
+ val PROXY_USER = CommonVars[String]("spark.proxy.user", "")
val SPARK_DEPLOY_MODE = CommonVars[String]("spark.submit.deployMode",
"client")
val SPARK_APP_NAME = CommonVars[String]("spark.app.name",
"Linkis-EngineConn-Spark")
+ val SPARK_APP_RESOURCE = CommonVars[String]("spark.app.resource", "")
+ val SPARK_APP_CONF = CommonVars[String]("spark.extconf", "")
val SPARK_PYTHON_VERSION = CommonVars[String]("spark.python.version",
"python")
@@ -121,6 +126,17 @@ object SparkConfiguration extends Logging {
val REPLACE_PACKAGE_TO_HEADER = "org.apache.linkis"
+ val SPARK_APPLICATION_ARGS = CommonVars("spark.app.args", "")
+ val SPARK_APPLICATION_MAIN_CLASS = CommonVars("spark.app.main.class", "")
+
+ val SPARK_ONCE_APP_STATUS_FETCH_INTERVAL =
+ CommonVars("linkis.spark.once.app.fetch.status.interval", new
TimeType("5s"))
+
+ val SPARK_ONCE_APP_STATUS_FETCH_FAILED_MAX =
+ CommonVars("linkis.spark.once.app.fetch.status.failed.num", 3)
+
+ val SPARK_ONCE_YARN_RESTFUL_URL =
CommonVars[String]("linkis.spark.once.yarn.restful.url", "")
+
private def getMainJarName(): String = {
val somePath = ClassUtils.jarOfClass(classOf[SparkEngineConnFactory])
if (somePath.isDefined) {
diff --git
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/context/EnvironmentContext.scala
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/context/EnvironmentContext.scala
new file mode 100644
index 000000000..153a72632
--- /dev/null
+++
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/context/EnvironmentContext.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.context
+
+import org.apache.linkis.engineplugin.spark.client.context.SparkConfig
+
+import java.net.URL
+import java.util
+import java.util.Objects
+
+class EnvironmentContext(
+ yarnConfDir: String,
+ sparkConfDir: String,
+ sparkHome: String,
+ dependencies: util.List[URL]
+) {
+
+ private var sparkConfig: SparkConfig = _
+
+ def this(
+ sparkConfig: SparkConfig,
+ yarnConfDir: String,
+ sparkConfDir: String,
+ sparkHome: String,
+ dependencies: util.List[URL]
+ ) {
+ this(yarnConfDir, sparkConfDir, sparkHome, dependencies)
+ this.sparkConfig = sparkConfig
+ }
+
+ def getYarnConfDir: String = yarnConfDir
+
+ def getSparkConfDir: String = sparkConfDir
+
+ def getSparkHome: String = sparkHome
+
+ def getSparkConfig: SparkConfig = sparkConfig
+
+ def getDependencies: util.List[URL] = dependencies
+
+ override def equals(o: Any): Boolean = o match {
+ case context: EnvironmentContext =>
+ if (this eq context) return true
+ Objects.equals(dependencies, context.getDependencies) &&
+ Objects.equals(sparkConfig, context.sparkConfig)
+ case _ => false
+ }
+
+ override def hashCode: Int = Objects.hash(dependencies, sparkConfig)
+}
diff --git
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/launch/EngineConnLaunchBuilder.scala
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/context/SparkEngineConnContext.scala
similarity index 61%
copy from
linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/launch/EngineConnLaunchBuilder.scala
copy to
linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/context/SparkEngineConnContext.scala
index c0e919863..4b1e6a2eb 100644
---
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/launch/EngineConnLaunchBuilder.scala
+++
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/context/SparkEngineConnContext.scala
@@ -15,15 +15,19 @@
* limitations under the License.
*/
-package org.apache.linkis.manager.engineplugin.common.launch
+package org.apache.linkis.engineplugin.spark.context
-import org.apache.linkis.manager.engineplugin.common.launch.entity.{
- EngineConnBuildRequest,
- EngineConnLaunchRequest
-}
+import org.apache.linkis.engineplugin.spark.client.context.ExecutionContext
+
+class SparkEngineConnContext(environmentContext: EnvironmentContext) {
+
+ private var executionContext: ExecutionContext = _
+
+ def getEnvironmentContext: EnvironmentContext = environmentContext
-trait EngineConnLaunchBuilder {
+ def getExecutionContext: ExecutionContext = executionContext
- def buildEngineConn(engineConnBuildRequest: EngineConnBuildRequest):
EngineConnLaunchRequest
+ def setExecutionContext(executionContext: ExecutionContext): Unit =
this.executionContext =
+ executionContext
}
diff --git
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkExecutor.scala
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkExecutor.scala
new file mode 100644
index 000000000..dad6ef2ed
--- /dev/null
+++
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkExecutor.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.executor
+
+import org.apache.linkis.engineconn.executor.entity.{LabelExecutor,
ResourceExecutor, YarnExecutor}
+import org.apache.linkis.engineplugin.spark.context.SparkEngineConnContext
+import org.apache.linkis.engineplugin.spark.errorcode.SparkErrorCodeSummary
+import org.apache.linkis.engineplugin.spark.exception.JobExecutionException
+import org.apache.linkis.manager.common.entity.resource._
+import org.apache.linkis.manager.label.entity.Label
+
+import java.util
+
+trait SparkExecutor extends YarnExecutor with LabelExecutor with
ResourceExecutor {
+
+ private var applicationId: String = _
+ private var yarnMode: String = "Client"
+ private var queue: String = _
+
+ private var executorLabels: util.List[Label[_]] = new
util.ArrayList[Label[_]]
+
+ override def getApplicationId: String = applicationId
+
+ def setApplicationId(applicationId: String): Unit = this.applicationId =
applicationId
+
+ override def getYarnMode: String = yarnMode
+
+ def setYarnMode(yarnMode: String): Unit = this.yarnMode = yarnMode
+
+ override def getQueue: String = queue
+
+ def setQueue(queue: String): Unit = this.queue = queue
+
+ override def getExecutorLabels(): util.List[Label[_]] = executorLabels
+
+ override def setExecutorLabels(labels: util.List[Label[_]]): Unit =
this.executorLabels = labels
+
+ override def requestExpectedResource(expectedResource: NodeResource):
NodeResource =
+ throw new JobExecutionException(
+ SparkErrorCodeSummary.NOT_SUPPORT_METHOD.getErrorCode,
+ SparkErrorCodeSummary.NOT_SUPPORT_METHOD.getErrorDesc
+ )
+
+ protected val sparkEngineConnContext: SparkEngineConnContext
+
+ queue = sparkEngineConnContext.getEnvironmentContext.getSparkConfig.getQueue
+
+ def supportCallBackLogs(): Boolean = true
+}
diff --git
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkOnceExecutor.scala
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkOnceExecutor.scala
new file mode 100644
index 000000000..faa90b03a
--- /dev/null
+++
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkOnceExecutor.scala
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.executor
+
+import org.apache.linkis.common.utils.Utils
+import org.apache.linkis.engineconn.once.executor.{
+ ManageableOnceExecutor,
+ OnceExecutorExecutionContext
+}
+import org.apache.linkis.engineplugin.spark.client.deployment.{
+ ClusterDescriptorAdapter,
+ ClusterDescriptorAdapterFactory
+}
+import org.apache.linkis.engineplugin.spark.config.SparkConfiguration.{
+ SPARK_ONCE_APP_STATUS_FETCH_FAILED_MAX,
+ SPARK_ONCE_APP_STATUS_FETCH_INTERVAL
+}
+import org.apache.linkis.engineplugin.spark.errorcode.SparkErrorCodeSummary
+import org.apache.linkis.engineplugin.spark.exception.ExecutorInitException
+import org.apache.linkis.manager.common.entity.enumeration.NodeStatus
+
+import org.apache.spark.launcher.SparkAppHandle
+
+import java.util.concurrent.{Future, TimeUnit}
+
+import scala.collection.convert.WrapAsScala._
+
+trait SparkOnceExecutor[T <: ClusterDescriptorAdapter]
+ extends ManageableOnceExecutor
+ with SparkExecutor {
+
+ protected var clusterDescriptorAdapter: T = _
+ private var daemonThread: Future[_] = _
+
+ protected def submit(onceExecutorExecutionContext:
OnceExecutorExecutionContext): Unit = {
+
ClusterDescriptorAdapterFactory.create(sparkEngineConnContext.getExecutionContext)
match {
+ case adapter: T => clusterDescriptorAdapter = adapter
+ case _ =>
+ throw new ExecutorInitException(
+ SparkErrorCodeSummary.NOT_SUPPORT_ADAPTER.getErrorCode,
+ SparkErrorCodeSummary.NOT_SUPPORT_ADAPTER.getErrorDesc
+ )
+ }
+ val options =
onceExecutorExecutionContext.getOnceExecutorContent.getJobContent.map {
+ case (k, v: String) => k -> v
+ case (k, v) if v != null => k -> v.toString
+ case (k, _) => k -> null
+ }.toMap
+ doSubmit(onceExecutorExecutionContext, options)
+ if (isCompleted) return
+ if (null == clusterDescriptorAdapter.getApplicationId)
+ throw new ExecutorInitException(
+ SparkErrorCodeSummary.YARN_APPLICATION_START_FAILED.getErrorCode,
+ SparkErrorCodeSummary.YARN_APPLICATION_START_FAILED.getErrorDesc
+ )
+ setApplicationId(clusterDescriptorAdapter.getApplicationId)
+ logger.info(s"Application is started, applicationId: $getApplicationId.")
+ }
+
+ protected def isCompleted: Boolean = isClosed ||
NodeStatus.isCompleted(getStatus)
+
+ def doSubmit(
+ onceExecutorExecutionContext: OnceExecutorExecutionContext,
+ options: Map[String, String]
+ ): Unit
+
+ val id: Long
+
+ override def getId: String = "SparkOnceApp_" + id
+
+ protected def closeDaemon(): Unit = {
+ if (daemonThread != null) daemonThread.cancel(true)
+ }
+
+ override def close(): Unit = {
+ super.close()
+ closeDaemon()
+ if (clusterDescriptorAdapter != null) {
+ clusterDescriptorAdapter.close()
+ }
+ }
+
+ override protected def waitToRunning(): Unit = {
+ var waitingToFinished = false
+ if (!isCompleted)
+ daemonThread = Utils.defaultScheduler.scheduleAtFixedRate(
+ new Runnable {
+ private var lastStatus: SparkAppHandle.State = _
+ private var lastPrintTime = 0L
+ private val printInterval =
+ math.max(SPARK_ONCE_APP_STATUS_FETCH_INTERVAL.getValue.toLong, 5 *
60 * 1000)
+ private var fetchJobStatusFailedNum = 0
+
+ override def run(): Unit = if (!isCompleted && !waitingToFinished) {
+ val jobState =
Utils.tryCatch(clusterDescriptorAdapter.getJobState) { t =>
+ val maxFailedNum =
SPARK_ONCE_APP_STATUS_FETCH_FAILED_MAX.getValue
+ if (fetchJobStatusFailedNum >= maxFailedNum) {
+ val errMsg =
+ s"Fetch job status has failed max $maxFailedNum times, now
stop this SparkEngineConn."
+ logger.error(errMsg, t)
+ tryFailed()
+ close()
+ } else {
+ fetchJobStatusFailedNum += 1
+ logger.error(s"Fetch job status failed! retried
++$fetchJobStatusFailedNum...", t)
+ }
+ return
+ }
+
+ fetchJobStatusFailedNum = 0
+ if (
+ jobState != lastStatus || System.currentTimeMillis -
lastPrintTime >= printInterval
+ ) {
+ logger.info(s"The jobState of $getApplicationId is $jobState.")
+ lastPrintTime = System.currentTimeMillis
+ }
+ lastStatus = jobState
+ if (SparkAppHandle.State.FINISHED == lastStatus) {
+ waitingToFinished = true
+ logger.info("Job has finished, waiting for final status.")
+ Thread.sleep(5000)
+ logger.info(s"Job's final status
${clusterDescriptorAdapter.getJobState}.")
+ }
+ clusterDescriptorAdapter.getJobState match {
+ case SparkAppHandle.State.FAILED | SparkAppHandle.State.KILLED |
+ SparkAppHandle.State.LOST =>
+ tryFailed()
+ case SparkAppHandle.State.FINISHED =>
+ trySucceed()
+ case _ =>
+ }
+ waitingToFinished = false
+ }
+ },
+ SPARK_ONCE_APP_STATUS_FETCH_INTERVAL.getValue.toLong,
+ SPARK_ONCE_APP_STATUS_FETCH_INTERVAL.getValue.toLong,
+ TimeUnit.MILLISECONDS
+ )
+ }
+
+ override def supportCallBackLogs(): Boolean = true
+
+}
diff --git
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkSubmitOnceExecutor.scala
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkSubmitOnceExecutor.scala
new file mode 100644
index 000000000..8cec44d00
--- /dev/null
+++
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkSubmitOnceExecutor.scala
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.executor
+
+import org.apache.linkis.common.utils.{ByteTimeUtils, Utils}
+import org.apache.linkis.engineconn.once.executor.{
+ OnceExecutorExecutionContext,
+ OperableOnceExecutor
+}
+import
org.apache.linkis.engineplugin.spark.client.deployment.YarnApplicationClusterDescriptorAdapter
+import org.apache.linkis.engineplugin.spark.config.SparkConfiguration.{
+ SPARK_APP_CONF,
+ SPARK_APPLICATION_ARGS,
+ SPARK_APPLICATION_MAIN_CLASS
+}
+import org.apache.linkis.engineplugin.spark.context.SparkEngineConnContext
+import org.apache.linkis.engineplugin.spark.utils.SparkJobProgressUtil
+import org.apache.linkis.manager.common.entity.resource._
+import org.apache.linkis.manager.common.utils.ResourceUtils
+import org.apache.linkis.protocol.engine.JobProgressInfo
+
+import org.apache.commons.lang3.StringUtils
+
+import java.util
+
+import scala.concurrent.duration.Duration
+
+class SparkSubmitOnceExecutor(
+ override val id: Long,
+ override protected val sparkEngineConnContext: SparkEngineConnContext
+) extends SparkOnceExecutor[YarnApplicationClusterDescriptorAdapter]
+ with OperableOnceExecutor {
+
+ private var oldProgress: Float = 0f
+
+ override def doSubmit(
+ onceExecutorExecutionContext: OnceExecutorExecutionContext,
+ options: Map[String, String]
+ ): Unit = {
+ val args = SPARK_APPLICATION_ARGS.getValue(options)
+ val mainClass = SPARK_APPLICATION_MAIN_CLASS.getValue(options)
+ val extConf = SPARK_APP_CONF.getValue(options)
+ val confMap = new util.HashMap[String, String]()
+ if (StringUtils.isNotBlank(extConf)) {
+ for (conf <- extConf.split("\n")) {
+ if (StringUtils.isNotBlank(conf)) {
+ val pair = conf.trim.split("=")
+ if (pair.length == 2) {
+ confMap.put(pair(0), pair(1))
+ } else {
+ logger.warn(s"ignore spark conf: $conf")
+ }
+ }
+ }
+ }
+ logger.info(s"${System.getenv("YARN_RESTFUL_URL")}")
+ logger.info(s"Ready to submit spark application, mainClass: $mainClass,
args: $args.")
+ clusterDescriptorAdapter.deployCluster(mainClass, args, confMap)
+ }
+
+ override protected def waitToRunning(): Unit = {
+ Utils.waitUntil(() => clusterDescriptorAdapter.initJobId(), Duration.Inf)
+ super.waitToRunning()
+ }
+
+ override def getApplicationURL: String = ""
+
+ override def getCurrentNodeResource(): NodeResource = {
+ logger.info("Begin to get actual used resources!")
+ Utils.tryCatch({
+ val sparkConf = sparkEngineConnContext.getExecutionContext.getSparkConfig
+ val executorNum: Int = sparkConf.getNumExecutors
+ val executorMem: Long =
+ ByteTimeUtils.byteStringAsBytes(sparkConf.getExecutorMemory) *
executorNum
+
+ val driverMem: Long =
ByteTimeUtils.byteStringAsBytes(sparkConf.getDriverMemory)
+ val sparkExecutorCores = sparkConf.getExecutorCores * executorNum
+ val sparkDriverCores = sparkConf.getDriverCores
+ val queue = sparkConf.getQueue
+ logger.info(
+ "Current actual used resources is driverMem:" + driverMem +
",driverCores:" + sparkDriverCores + ",executorMem:" + executorMem +
",executorCores:" + sparkExecutorCores + ",queue:" + queue
+ )
+ val usedResource = new DriverAndYarnResource(
+ new LoadInstanceResource(driverMem, sparkDriverCores, 1),
+ new YarnResource(executorMem, sparkExecutorCores, 0, queue)
+ )
+ val nodeResource = new CommonNodeResource
+ nodeResource.setUsedResource(usedResource)
+
nodeResource.setResourceType(ResourceUtils.getResourceTypeByResource(usedResource))
+ nodeResource
+ })(t => {
+ logger.warn("Get actual used resource exception", t)
+ null
+ })
+ }
+
+ override def getProgress: Float = {
+ if (clusterDescriptorAdapter == null) {
+ logger.info("clusterDescriptorAdapter is null")
+ } else if (clusterDescriptorAdapter.getJobState == null) {
+ logger.info("clusterDescriptorAdapter.getJobState is null")
+ } else {
+ logger.info("clusterDescriptorAdapter/getJobState is not null")
+ }
+ val jobIsFinal = clusterDescriptorAdapter != null &&
+ clusterDescriptorAdapter.getJobState != null &&
+ clusterDescriptorAdapter.getJobState.isFinal
+ if (oldProgress >= 1 || jobIsFinal) {
+ 1
+ } else {
+ val newProgress = SparkJobProgressUtil.getProgress(this.getApplicationId)
+ if (newProgress > oldProgress) {
+ oldProgress = newProgress
+ }
+ oldProgress
+ }
+ }
+
+ override def getProgressInfo: Array[JobProgressInfo] =
+ SparkJobProgressUtil.getSparkJobProgressInfo(this.getApplicationId)
+
+ override def getMetrics: util.Map[String, Any] = {
+ new util.HashMap[String, Any]()
+ }
+
+ override def getDiagnosis: util.Map[String, Any] = {
+ new util.HashMap[String, Any]()
+ }
+
+}
diff --git
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
index d45f8b6ac..ec3b697ea 100644
---
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
+++
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
@@ -18,19 +18,27 @@
package org.apache.linkis.engineplugin.spark.factory
import org.apache.linkis.common.conf.CommonVars
-import org.apache.linkis.common.utils.{Logging, Utils}
+import org.apache.linkis.common.utils.{JsonUtils, Logging, Utils}
import org.apache.linkis.engineconn.common.creation.EngineCreationContext
+import org.apache.linkis.engineconn.launch.EngineConnServer
+import org.apache.linkis.engineplugin.spark.client.context.{ExecutionContext,
SparkConfig}
import org.apache.linkis.engineplugin.spark.config.SparkConfiguration
+import org.apache.linkis.engineplugin.spark.config.SparkConfiguration._
+import org.apache.linkis.engineplugin.spark.config.SparkResourceConfiguration._
+import org.apache.linkis.engineplugin.spark.context.{EnvironmentContext,
SparkEngineConnContext}
import org.apache.linkis.engineplugin.spark.entity.SparkEngineSession
import org.apache.linkis.engineplugin.spark.errorcode.SparkErrorCodeSummary._
import org.apache.linkis.engineplugin.spark.exception.{
SparkCreateFileException,
SparkSessionNullException
}
+import org.apache.linkis.manager.engineplugin.common.conf.EnvConfiguration
import org.apache.linkis.manager.engineplugin.common.creation.{
ExecutorFactory,
MultiExecutorEngineConnFactory
}
+import org.apache.linkis.manager.engineplugin.common.launch.process.Environment
+import
org.apache.linkis.manager.engineplugin.common.launch.process.Environment.variable
import org.apache.linkis.manager.label.entity.engine.EngineType
import org.apache.linkis.manager.label.entity.engine.EngineType.EngineType
import org.apache.linkis.server.JMap
@@ -51,8 +59,72 @@ class SparkEngineConnFactory extends
MultiExecutorEngineConnFactory with Logging
override protected def createEngineConnSession(
engineCreationContext: EngineCreationContext
): Any = {
+ if (EngineConnServer.isOnceMode) {
+ createSparkOnceEngineConnContext(engineCreationContext)
+ } else {
+ createSparkEngineSession(engineCreationContext)
+ }
+ }
+
+ def createSparkOnceEngineConnContext(
+ engineCreationContext: EngineCreationContext
+ ): SparkEngineConnContext = {
+ val environmentContext = createEnvironmentContext(engineCreationContext)
+ val sparkEngineConnContext = new SparkEngineConnContext(environmentContext)
+ val executionContext =
+ createExecutionContext(engineCreationContext.getOptions,
environmentContext)
+ sparkEngineConnContext.setExecutionContext(executionContext)
+ sparkEngineConnContext
+ }
+
+ protected def createEnvironmentContext(
+ engineCreationContext: EngineCreationContext
+ ): EnvironmentContext = {
+ val options = engineCreationContext.getOptions
+ val hadoopConfDir = EnvConfiguration.HADOOP_CONF_DIR.getValue(options)
+ val sparkHome = SPARK_HOME.getValue(options)
+ val sparkConfDir = SPARK_CONF_DIR.getValue(options)
+ val sparkConfig: SparkConfig = getSparkConfig(options)
+ val context = new EnvironmentContext(sparkConfig, hadoopConfDir,
sparkConfDir, sparkHome, null)
+ context
+ }
+
+ def getSparkConfig(options: util.Map[String, String]): SparkConfig = {
+ logger.info("options: " + JsonUtils.jackson.writeValueAsString(options))
+ val sparkConfig: SparkConfig = new SparkConfig()
+ sparkConfig.setJavaHome(variable(Environment.JAVA_HOME))
+ sparkConfig.setSparkHome(SPARK_HOME.getValue(options))
+ sparkConfig.setMaster(SPARK_MASTER.getValue(options))
+ sparkConfig.setDeployMode(SPARK_DEPLOY_MODE.getValue(options))
+ sparkConfig.setAppResource(SPARK_APP_RESOURCE.getValue(options))
+ sparkConfig.setAppName(SPARK_APP_NAME.getValue(options))
+ sparkConfig.setJars(SPARK_EXTRA_JARS.getValue(options)) // todo
+ sparkConfig.setDriverMemory(LINKIS_SPARK_DRIVER_MEMORY.getValue(options))
+
sparkConfig.setDriverJavaOptions(SPARK_DRIVER_EXTRA_JAVA_OPTIONS.getValue(options))
+ sparkConfig.setDriverClassPath(SPARK_DRIVER_CLASSPATH.getValue(options))
+
sparkConfig.setExecutorMemory(LINKIS_SPARK_EXECUTOR_MEMORY.getValue(options))
+ sparkConfig.setProxyUser(PROXY_USER.getValue(options))
+ sparkConfig.setDriverCores(LINKIS_SPARK_DRIVER_CORES.getValue(options))
+ sparkConfig.setExecutorCores(LINKIS_SPARK_EXECUTOR_CORES.getValue(options))
+
sparkConfig.setNumExecutors(LINKIS_SPARK_EXECUTOR_INSTANCES.getValue(options))
+ sparkConfig.setQueue(LINKIS_QUEUE_NAME.getValue(options))
+
+ logger.info(s"spark_info: ${sparkConfig}")
+ sparkConfig
+ }
+
+ def createExecutionContext(
+ options: util.Map[String, String],
+ environmentContext: EnvironmentContext
+ ): ExecutionContext = {
+ val context = new ExecutionContext()
+ context.setSparkConfig(environmentContext.getSparkConfig)
+ // todo
+ context
+ }
+
+ def createSparkEngineSession(engineCreationContext: EngineCreationContext):
SparkEngineSession = {
val options = engineCreationContext.getOptions
- val useSparkSubmit = true
val sparkConf: SparkConf = new SparkConf(true)
val master =
sparkConf.getOption("spark.master").getOrElse(CommonVars("spark.master",
"yarn").getValue)
@@ -188,7 +260,8 @@ class SparkEngineConnFactory extends
MultiExecutorEngineConnFactory with Logging
new SparkSqlExecutorFactory,
new SparkPythonExecutorFactory,
new SparkScalaExecutorFactory,
- new SparkDataCalcExecutorFactory
+ new SparkDataCalcExecutorFactory,
+ new SparkOnceExecutorFactory
)
override def getExecutorFactories: Array[ExecutorFactory] = {
diff --git
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkOnceExecutorFactory.scala
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkOnceExecutorFactory.scala
new file mode 100644
index 000000000..25e264944
--- /dev/null
+++
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkOnceExecutorFactory.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.factory
+
+import org.apache.linkis.engineconn.common.creation.EngineCreationContext
+import org.apache.linkis.engineconn.common.engineconn.EngineConn
+import org.apache.linkis.engineconn.once.executor.OnceExecutor
+import org.apache.linkis.engineconn.once.executor.creation.OnceExecutorFactory
+import org.apache.linkis.engineplugin.spark.context.SparkEngineConnContext
+import org.apache.linkis.engineplugin.spark.executor.SparkSubmitOnceExecutor
+import org.apache.linkis.manager.label.entity.Label
+import org.apache.linkis.manager.label.entity.engine.RunType
+import org.apache.linkis.manager.label.entity.engine.RunType.RunType
+
+class SparkOnceExecutorFactory extends OnceExecutorFactory {
+
+ override protected def newExecutor(
+ id: Int,
+ engineCreationContext: EngineCreationContext,
+ engineConn: EngineConn,
+ labels: Array[Label[_]]
+ ): OnceExecutor =
+ engineConn.getEngineConnSession match {
+ case context: SparkEngineConnContext =>
+ new SparkSubmitOnceExecutor(id, context)
+ }
+
+ override protected def getRunType: RunType = RunType.JAR
+}
diff --git
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkEngineConnLaunchBuilder.scala
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkEngineConnLaunchBuilder.scala
new file mode 100644
index 000000000..8be047e2f
--- /dev/null
+++
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkEngineConnLaunchBuilder.scala
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.launch
+
+import org.apache.linkis.common.utils.JsonUtils
+import org.apache.linkis.engineplugin.spark.config.SparkConfiguration.{
+ SPARK_CONF_DIR_ENV,
+ SPARK_HOME_ENV
+}
+import org.apache.linkis.engineplugin.spark.config.SparkResourceConfiguration
+import org.apache.linkis.hadoop.common.conf.HadoopConf
+import org.apache.linkis.manager.common.protocol.bml.BmlResource
+import org.apache.linkis.manager.engineplugin.common.conf.EnvConfiguration
+import
org.apache.linkis.manager.engineplugin.common.launch.entity.EngineConnBuildRequest
+import
org.apache.linkis.manager.engineplugin.common.launch.process.Environment.{variable,
USER}
+import
org.apache.linkis.manager.engineplugin.common.launch.process.JavaProcessEngineConnLaunchBuilder
+import
org.apache.linkis.manager.engineplugin.common.launch.process.LaunchConstants.addPathToClassPath
+import org.apache.linkis.manager.label.entity.engine.{
+ EngineConnMode,
+ EngineConnModeLabel,
+ UserCreatorLabel
+}
+import org.apache.linkis.manager.label.utils.LabelUtil
+
+import java.util
+
+import scala.collection.JavaConverters.asScalaBufferConverter
+
+import com.google.common.collect.Lists
+
+class SparkEngineConnLaunchBuilder extends JavaProcessEngineConnLaunchBuilder {
+
+ override protected def getCommands(implicit
+ engineConnBuildRequest: EngineConnBuildRequest
+ ): Array[String] = {
+ if (isOnceMode) {
+ val properties = engineConnBuildRequest.engineConnCreationDesc.properties
+ properties.put(
+ EnvConfiguration.ENGINE_CONN_MEMORY.key,
+
SparkResourceConfiguration.LINKIS_SPARK_DRIVER_MEMORY.getValue(properties)
+ )
+ super.getCommands
+ } else {
+ new SparkSubmitProcessEngineConnLaunchBuilder(this).getCommands(
+ engineConnBuildRequest,
+ getMainClass,
+ getGcLogDir(engineConnBuildRequest),
+ getLogDir(engineConnBuildRequest)
+ )
+ }
+ }
+
+ def isOnceMode: Boolean = {
+ val engineConnMode =
LabelUtil.getEngineConnMode(engineConnBuildRequest.labels)
+ EngineConnMode.toEngineConnMode(engineConnMode) == EngineConnMode.Once
+ }
+
+ override def getEnvironment(implicit
+ engineConnBuildRequest: EngineConnBuildRequest
+ ): util.Map[String, String] = {
+ val environment = super.getEnvironment
+ if (isOnceMode)
+ addPathToClassPath(environment, s"$$$SPARK_HOME_ENV/jars/*")
+ environment
+ }
+
+ override protected def getBmlResources(implicit
+ engineConnBuildRequest: EngineConnBuildRequest
+ ): java.util.List[BmlResource] = {
+ if (!isOnceMode) return super.getBmlResources
+ val bmlResources = new
java.util.ArrayList[BmlResource](super.getBmlResources)
+ val properties = engineConnBuildRequest.engineConnCreationDesc.properties
+ val userName = engineConnBuildRequest.labels.asScala
+ .find(_.isInstanceOf[UserCreatorLabel])
+ .map { case label: UserCreatorLabel => label.getUser }
+ .get
+
+ def getBmlString(bml: BmlResource): String = {
+ s"BmlResource(${bml.getFileName}, ${bml.getResourceId},
${bml.getVersion})"
+ }
+
+ val ticketId = engineConnBuildRequest.ticketId
+ properties.get("spark.app.main.class.jar.bml.json") match {
+ case mainClassJarContent: String =>
+ val bml = contentToBmlResource(userName, mainClassJarContent)
+ logger.info(s"Add a ${getBmlString(bml)} for user $userName and
ticketId $ticketId")
+ bmlResources.add(bml)
+ properties.remove("spark.app.main.class.jar.bml.json")
+ case _ =>
+ }
+ properties.get("spark.app.user.class.path.bml.json") match {
+ case classpathContent: String =>
+ val contentList = JsonUtils.jackson.readValue(
+ classpathContent,
+ classOf[java.util.List[java.util.Map[String, Object]]]
+ )
+ contentList.asScala.map(contentToBmlResource(userName, _)).foreach {
bml =>
+ logger.info(s"Add a ${getBmlString(bml)} for user $userName and
ticketId $ticketId")
+ bmlResources.add(bml)
+ }
+ properties.remove("spark.app.user.class.path.bml.json")
+ case _ =>
+ }
+ bmlResources
+ }
+
+ private def contentToBmlResource(userName: String, content: String):
BmlResource = {
+ val contentMap = JsonUtils.jackson.readValue(content,
classOf[java.util.Map[String, Object]])
+ contentToBmlResource(userName, contentMap)
+ }
+
+ private def contentToBmlResource(
+ userName: String,
+ contentMap: java.util.Map[String, Object]
+ ): BmlResource = {
+ val bmlResource = new BmlResource
+ bmlResource.setFileName(contentMap.get("fileName").asInstanceOf[String])
+
bmlResource.setResourceId(contentMap.get("resourceId").asInstanceOf[String])
+ bmlResource.setVersion(contentMap.get("version").asInstanceOf[String])
+ bmlResource.setOwner(userName)
+ bmlResource.setVisibility(BmlResource.BmlResourceVisibility.Private)
+ bmlResource
+ }
+
+ override protected def getNecessaryEnvironment(implicit
+ engineConnBuildRequest: EngineConnBuildRequest
+ ): Array[String] = if (isOnceMode) {
+ Array(SPARK_HOME_ENV, SPARK_CONF_DIR_ENV) ++: super.getNecessaryEnvironment
+ } else {
+ super.getNecessaryEnvironment
+ }
+
+ override protected def getExtractJavaOpts: String = if (isOnceMode) {
+ if (!HadoopConf.KEYTAB_PROXYUSER_ENABLED.getValue) super.getExtractJavaOpts
+ else super.getExtractJavaOpts + s"
-DHADOOP_PROXY_USER=${variable(USER)}".trim
+ } else {
+ super.getExtractJavaOpts
+ }
+
+ override protected def ifAddHiveConfigPath: Boolean = if (isOnceMode) {
+ true
+ } else {
+ super.ifAddHiveConfigPath
+ }
+
+ override def enablePublicModule: Boolean = !isOnceMode
+
+ override protected def getEngineConnManagerHooks(implicit
+ engineConnBuildRequest: EngineConnBuildRequest
+ ): java.util.List[String] = if (isOnceMode) {
+ super.getEngineConnManagerHooks(engineConnBuildRequest)
+ } else {
+ Lists.newArrayList("JarUDFLoadECMHook")
+ }
+
+}
diff --git
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkSubmitProcessEngineConnLaunchBuilder.scala
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkSubmitProcessEngineConnLaunchBuilder.scala
index f9b53f8e1..f25a1d55b 100644
---
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkSubmitProcessEngineConnLaunchBuilder.scala
+++
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkSubmitProcessEngineConnLaunchBuilder.scala
@@ -18,347 +18,152 @@
package org.apache.linkis.engineplugin.spark.launch
import org.apache.linkis.common.conf.CommonVars
-import org.apache.linkis.engineplugin.spark.config.{SparkConfiguration,
SparkResourceConfiguration}
-import
org.apache.linkis.engineplugin.spark.config.SparkResourceConfiguration.LINKIS_SPARK_DRIVER_MEMORY
-import
org.apache.linkis.engineplugin.spark.launch.SparkSubmitProcessEngineConnLaunchBuilder.{
- getValueAndRemove,
- AbsolutePath,
- Path,
- RelativePath
+import org.apache.linkis.engineplugin.spark.config.SparkConfiguration.{
+ ENGINE_JAR,
+ SPARK_APP_NAME,
+ SPARK_DEFAULT_EXTERNAL_JARS_PATH,
+ SPARK_DRIVER_CLASSPATH,
+ SPARK_DRIVER_EXTRA_JAVA_OPTIONS,
+ SPARK_PYTHON_VERSION,
+ SPARK_SUBMIT_PATH
}
+import org.apache.linkis.engineplugin.spark.config.SparkResourceConfiguration._
import org.apache.linkis.hadoop.common.conf.HadoopConf
-import
org.apache.linkis.manager.common.entity.resource.{DriverAndYarnResource,
NodeResource}
+import org.apache.linkis.manager.common.entity.resource.DriverAndYarnResource
import org.apache.linkis.manager.engineplugin.common.conf.EnvConfiguration
import
org.apache.linkis.manager.engineplugin.common.launch.entity.EngineConnBuildRequest
import
org.apache.linkis.manager.engineplugin.common.launch.process.Environment._
import
org.apache.linkis.manager.engineplugin.common.launch.process.JavaProcessEngineConnLaunchBuilder
-import org.apache.linkis.manager.label.entity.Label
import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel
import org.apache.linkis.protocol.UserWithCreator
import org.apache.commons.lang3.StringUtils
-import java.lang.ProcessBuilder.Redirect
-import java.util
-
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
-import com.google.common.collect.Lists
-
-class SparkSubmitProcessEngineConnLaunchBuilder private extends
JavaProcessEngineConnLaunchBuilder {
-
- private[this] val fsRoot = "hdfs://"
- protected var port: Int = _
- protected var request: EngineConnBuildRequest = _
- protected var userEngineResource: NodeResource = _
- private[this] var _executable: Path = _
- private[this] var _master: Option[String] = None
- private[this] var _deployMode: Option[String] = None
- private[this] var _className: Option[String] = None
- private[this] var _name: Option[String] = None
- private[this] var _jars: ArrayBuffer[Path] = ArrayBuffer()
- private[this] var _pyFiles: ArrayBuffer[Path] = ArrayBuffer()
- private[this] var _files: ArrayBuffer[Path] = ArrayBuffer()
- private[this] var _conf: ArrayBuffer[(String, String)] = ArrayBuffer()
- private[this] var _driverMemory: Option[String] = None
- private[this] var _driverJavaOptions: Option[String] = None
- private[this] var _driverClassPath: ArrayBuffer[String] = ArrayBuffer()
- private[this] var _executorMemory: Option[String] = None
- private[this] var _proxyUser: Option[String] = None
-
- private[this] var _driverCores: Option[String] = None
- private[this] var _executorCores: Option[String] = None
- private[this] var _queue: Option[String] = None
- private[this] var _numExecutors: Option[String] = None
- private[this] var _archives: ArrayBuffer[Path] = ArrayBuffer()
-
- private[this] var _env: ArrayBuffer[(String, String)] = ArrayBuffer()
- private[this] var _redirectOutput: Option[ProcessBuilder.Redirect] = None
- private[this] var _redirectError: Option[ProcessBuilder.Redirect] = None
- private[this] var _redirectErrorStream: Option[Boolean] = None
-
- private[this] var _userWithCreator: UserWithCreator =
- UserWithCreator("DefaultUser", "DefaultCreator")
-
- private[this] var _labels: ArrayBuffer[Label[_]] = ArrayBuffer()
-
- def executable(executable: Path): SparkSubmitProcessEngineConnLaunchBuilder
= {
- _executable = executable
- this
- }
-
- def jars(jars: Traversable[Path]): SparkSubmitProcessEngineConnLaunchBuilder
= {
- this._jars ++= jars
- this
- }
-
- def pyFile(pyFile: Path): SparkSubmitProcessEngineConnLaunchBuilder = {
- this._pyFiles += pyFile
- this
- }
-
- def pyFiles(pyFiles: Traversable[Path]):
SparkSubmitProcessEngineConnLaunchBuilder = {
- this._pyFiles ++= pyFiles
- this
- }
-
- def files(files: Traversable[Path]):
SparkSubmitProcessEngineConnLaunchBuilder = {
- this._files ++= files
- this
- }
+class SparkSubmitProcessEngineConnLaunchBuilder(builder:
JavaProcessEngineConnLaunchBuilder) {
- def conf(conf: Traversable[(String, String)]):
SparkSubmitProcessEngineConnLaunchBuilder = {
- this._conf ++= conf
- this
- }
-
- def conf(conf: (String, String)): SparkSubmitProcessEngineConnLaunchBuilder =
- this.conf(conf._1, conf._2)
-
- def driverJavaOptions(driverJavaOptions: String):
SparkSubmitProcessEngineConnLaunchBuilder = {
- _driverJavaOptions = Some(driverJavaOptions)
- this
- }
-
- def driverClassPaths(
- classPaths: Traversable[String]
- ): SparkSubmitProcessEngineConnLaunchBuilder = {
- _driverClassPath ++= classPaths
- this
- }
+ def getCommands(
+ engineConnBuildRequest: EngineConnBuildRequest,
+ mainClass: String,
+ gcLogDir: String,
+ logDir: String
+ ): Array[String] = {
+ val userEngineResource = engineConnBuildRequest.engineResource
+ val darResource =
userEngineResource.getLockedResource.asInstanceOf[DriverAndYarnResource]
+ val properties = engineConnBuildRequest.engineConnCreationDesc.properties
- def archives(archives: Traversable[Path]):
SparkSubmitProcessEngineConnLaunchBuilder = {
- archives.foreach(archive)
- this
- }
+ val className = getValueAndRemove(properties, "className", mainClass)
+ val driverCores = getValueAndRemove(properties, LINKIS_SPARK_DRIVER_CORES)
+ val driverMemory = getValueAndRemove(properties,
LINKIS_SPARK_DRIVER_MEMORY)
+ val executorCores = getValueAndRemove(properties,
LINKIS_SPARK_EXECUTOR_CORES)
+ val executorMemory = getValueAndRemove(properties,
LINKIS_SPARK_EXECUTOR_MEMORY)
+ val numExecutors = getValueAndRemove(properties,
LINKIS_SPARK_EXECUTOR_INSTANCES)
+
+ val files = getValueAndRemove(properties, "files",
"").split(",").filter(isNotBlankPath)
+ val jars = new ArrayBuffer[String]()
+ jars ++= getValueAndRemove(properties, "jars",
"").split(",").filter(isNotBlankPath)
+ jars ++= getValueAndRemove(properties, SPARK_DEFAULT_EXTERNAL_JARS_PATH)
+ .split(",")
+ .filter(x => {
+ isNotBlankPath(x) && (new java.io.File(x)).isFile
+ })
+ val pyFiles = getValueAndRemove(properties, "py-files",
"").split(",").filter(isNotBlankPath)
+ val archives = getValueAndRemove(properties, "archives",
"").split(",").filter(isNotBlankPath)
- def archive(archive: Path): SparkSubmitProcessEngineConnLaunchBuilder = {
- _archives += archive
- this
- }
+ val queue = if (null != darResource) {
+ darResource.yarnResource.queueName
+ } else {
+ "default"
+ }
- def redirectError(
- redirect: ProcessBuilder.Redirect
- ): SparkSubmitProcessEngineConnLaunchBuilder = {
- _redirectError = Some(redirect)
- this
- }
+ val driverClassPath =
+ Array(getValueAndRemove(properties, SPARK_DRIVER_CLASSPATH),
variable(CLASSPATH))
- def setPort(port: Int): Unit = this.port = port
+ var userWithCreator: UserWithCreator = UserWithCreator("DefaultUser",
"DefaultCreator")
+ engineConnBuildRequest.labels.asScala.foreach {
+ case label: UserCreatorLabel =>
+ userWithCreator = UserWithCreator(label.getUser, label.getCreator)
+ case _ =>
+ }
+ val appName = getValueAndRemove(properties, SPARK_APP_NAME) + "_" +
userWithCreator.creator
- override protected def getCommands(implicit
- engineConnBuildRequest: EngineConnBuildRequest
- ): Array[String] = {
- build(engineConnBuildRequest)
val commandLine: ArrayBuffer[String] = ArrayBuffer[String]()
- commandLine += SparkConfiguration.SPARK_SUBMIT_PATH.getValue
-
- def addOpt(option: String, value: Option[String]): Unit = {
- value.foreach { v =>
- commandLine += option
- commandLine += v
- }
- }
+ commandLine += SPARK_SUBMIT_PATH.getValue
- def addList(option: String, values: Traversable[String]): Unit = {
- if (values.nonEmpty) {
+ def addOpt(option: String, value: String): Unit = {
+ if (StringUtils.isNotBlank(value)) {
commandLine += option
- commandLine += values.mkString(",")
+ commandLine += value
}
}
- def addClasspath(option: String, values: Traversable[String]): Unit = {
- if (values.nonEmpty) {
- commandLine += option
- commandLine += values.mkString(":")
+ def addProxyUser(): Unit = {
+ if (!HadoopConf.KEYTAB_PROXYUSER_ENABLED.getValue) return
+ val proxyUser = getValueAndRemove(properties, "proxyUser", "")
+ if (StringUtils.isNotBlank(proxyUser)) {
+ addOpt("--proxy-user", proxyUser)
+ } else {
+ addOpt("--proxy-user", userWithCreator.user)
}
}
- addOpt("--master", _master)
- addOpt("--deploy-mode", _deployMode)
- addOpt("--name", _name)
-
- if (HadoopConf.KEYTAB_PROXYUSER_ENABLED.getValue && _proxyUser.nonEmpty) {
- addOpt("--proxy-user", _proxyUser)
+ def getMemory(memory: String): String = if (StringUtils.isNumeric(memory))
{
+ memory + "g"
+ } else {
+ memory
}
- // addOpt("--jars",Some(ENGINEMANAGER_JAR.getValue))
- // info("No need to add jars for " + _jars.map(fromPath).exists(x =>
x.equals("hdfs:///")).toString())
- _jars = _jars.filter(_.isNotBlankPath())
-
- if (_jars.isEmpty) {
- _jars += AbsolutePath("")
- }
- _jars += AbsolutePath(variable(UDF_JARS))
- if (_jars.nonEmpty) {
- addList("--jars", _jars.map(fromPath))
- }
+ addOpt("--master", "yarn")
+ addOpt("--deploy-mode", "client")
+ addOpt("--name", appName)
+ addProxyUser()
- _pyFiles = _pyFiles.filter(_.isNotBlankPath())
- if (_pyFiles.nonEmpty) {
- addList("--py-files", _pyFiles.map(fromPath))
+ if (jars.isEmpty) {
+ jars += ""
}
+ jars += variable(UDF_JARS)
- _files = _files.filter(_.isNotBlankPath())
- if (_files.nonEmpty) {
- addList("--files", _files.map(fromPath))
- }
+ addOpt("--jars", jars.mkString(","))
+ addOpt("--py-files", pyFiles.mkString(","))
+ addOpt("--files", files.mkString(","))
+ addOpt("--archives", archives.mkString(","))
+ addOpt("--driver-class-path", driverClassPath.mkString(":"))
+ addOpt("--driver-memory", getMemory(driverMemory))
+ addOpt("--driver-cores", driverCores.toString)
+ addOpt("--executor-memory", getMemory(executorMemory))
+ addOpt("--executor-cores", executorCores.toString)
+ addOpt("--num-executors", numExecutors.toString)
+ addOpt("--queue", queue)
- _archives = _archives.filter(_.isNotBlankPath())
- if (_archives.nonEmpty) {
- addList("--archives", _archives.map(fromPath))
- }
- _conf.foreach { case (key, value) =>
- if (key.startsWith("spark.")) {
- // subcommand cannot be quoted by double quote, use single quote
instead
- addOpt("--conf", Some(key + "=\"" + value + "\""))
- }
+ getConf(engineConnBuildRequest, gcLogDir, logDir).foreach { case (key,
value) =>
+ addOpt("--conf", s"""$key="$value"""")
}
- addOpt("--driver-memory", _driverMemory)
- addClasspath("--driver-class-path", _driverClassPath)
- addOpt("--driver-cores", _driverCores)
- addOpt("--executor-memory", _executorMemory)
- addOpt("--executor-cores", _executorCores)
- addOpt("--num-executors", _numExecutors)
- addOpt("--queue", _queue)
-
- addOpt("--class", _className)
- addOpt("1>", Some(s"${variable(LOG_DIRS)}/stdout"))
- addOpt("2>>", Some(s"${variable(LOG_DIRS)}/stderr"))
- addOpt("", Some(s"
${variable(PWD)}/lib/${SparkConfiguration.ENGINE_JAR.getValue}"))
+ addOpt("--class", className)
+ addOpt("1>", s"${variable(LOG_DIRS)}/stdout")
+ addOpt("2>>", s"${variable(LOG_DIRS)}/stderr")
+ addOpt("", s" ${variable(PWD)}/lib/${ENGINE_JAR.getValue}")
commandLine.toArray.filter(StringUtils.isNotEmpty)
}
- override def enablePublicModule = true
-
- def master(masterUrl: String): SparkSubmitProcessEngineConnLaunchBuilder = {
- _master = Some(masterUrl)
- this
- }
-
- def deployMode(deployMode: String):
SparkSubmitProcessEngineConnLaunchBuilder = {
- _deployMode = Some(deployMode)
- this
- }
-
- def className(className: String): SparkSubmitProcessEngineConnLaunchBuilder
= {
- _className = Some(className)
- this
- }
-
- def name(name: String): SparkSubmitProcessEngineConnLaunchBuilder = {
- _name = Some(name)
- this
- }
-
- def jar(jar: Path): SparkSubmitProcessEngineConnLaunchBuilder = {
- this._jars += jar
- this
- }
-
- def file(file: Path): SparkSubmitProcessEngineConnLaunchBuilder = {
- this._files += file
- this
- }
-
- def conf(key: String, value: String):
SparkSubmitProcessEngineConnLaunchBuilder = {
- this._conf += ((key, value))
- this
- }
-
- def driverMemory(driverMemory: String):
SparkSubmitProcessEngineConnLaunchBuilder = {
- _driverMemory = Some(driverMemory)
- this
- }
-
- def driverClassPath(classPath: String):
SparkSubmitProcessEngineConnLaunchBuilder = {
- _driverClassPath += classPath
- this
- }
-
- def executorMemory(executorMemory: String):
SparkSubmitProcessEngineConnLaunchBuilder = {
- _executorMemory = Some(executorMemory)
- this
- }
-
- def proxyUser(proxyUser: String): SparkSubmitProcessEngineConnLaunchBuilder
= {
- _proxyUser = Some(proxyUser)
- this
- }
-
- def driverCores(driverCores: Int): SparkSubmitProcessEngineConnLaunchBuilder
= {
- this.driverCores(driverCores.toString)
- }
-
- def driverCores(driverCores: String):
SparkSubmitProcessEngineConnLaunchBuilder = {
- _driverCores = Some(driverCores)
- this
- }
-
- def executorCores(executorCores: Int):
SparkSubmitProcessEngineConnLaunchBuilder = {
- this.executorCores(executorCores.toString)
- }
-
- def executorCores(executorCores: String):
SparkSubmitProcessEngineConnLaunchBuilder = {
- _executorCores = Some(executorCores)
- this
- }
-
- def numExecutors(numExecutors: Int):
SparkSubmitProcessEngineConnLaunchBuilder = {
- this.numExecutors(numExecutors.toString)
- }
-
- def numExecutors(numExecutors: String):
SparkSubmitProcessEngineConnLaunchBuilder = {
- _numExecutors = Some(numExecutors)
- this
- }
-
- def queue(queue: String): SparkSubmitProcessEngineConnLaunchBuilder = {
- _queue = Some(queue)
- this
- }
-
- def env(key: String, value: String):
SparkSubmitProcessEngineConnLaunchBuilder = {
- _env += ((key, value))
- this
- }
-
- def redirectOutput(
- redirect: ProcessBuilder.Redirect
- ): SparkSubmitProcessEngineConnLaunchBuilder = {
- _redirectOutput = Some(redirect)
- this
- }
-
- def redirectErrorStream(redirect: Boolean):
SparkSubmitProcessEngineConnLaunchBuilder = {
- _redirectErrorStream = Some(redirect)
- this
- }
-
- def getEngineResource: NodeResource = userEngineResource
-
- def build(engineRequest: EngineConnBuildRequest): Unit = {
- this.request = engineRequest
- this.userEngineResource = engineRequest.engineResource
- val darResource: DriverAndYarnResource =
- userEngineResource.getLockedResource.asInstanceOf[DriverAndYarnResource]
- val properties = engineRequest.engineConnCreationDesc.properties
- this.master("yarn")
- this.deployMode("client")
+ def getConf(
+ engineConnBuildRequest: EngineConnBuildRequest,
+ gcLogDir: String,
+ logDir: String
+ ): ArrayBuffer[(String, String)] = {
val driverJavaSet = new StringBuilder(" -server")
-
if
(StringUtils.isNotEmpty(EnvConfiguration.ENGINE_CONN_DEFAULT_JAVA_OPTS.getValue))
{
EnvConfiguration.ENGINE_CONN_DEFAULT_JAVA_OPTS.getValue
- .format(getGcLogDir(engineRequest))
+ .format(gcLogDir)
.split("\\s+")
.foreach(l => {
driverJavaSet.append(" ").append(l)
})
}
- getLogDir(engineRequest).trim
+ logDir.trim
.split(" ")
.foreach(l => {
driverJavaSet.append(" ").append(l)
@@ -369,142 +174,35 @@ class SparkSubmitProcessEngineConnLaunchBuilder private
extends JavaProcessEngin
s"
-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=${variable(RANDOM_PORT)}"
)
}
- this.conf(SparkConfiguration.SPARK_DRIVER_EXTRA_JAVA_OPTIONS.key,
driverJavaSet.toString())
- // this.conf("spark.sql.extensions",
"org.apache.linkis.hook.spark.extension.SparkHistoryExtension")
- this.className(getValueAndRemove(properties, "className", getMainClass))
-
- getValueAndRemove(properties, "archives", "").toString
- .split(",")
- .map(AbsolutePath)
- .foreach(this.archive)
- this.driverCores(
- getValueAndRemove(properties,
SparkResourceConfiguration.LINKIS_SPARK_DRIVER_CORES)
- )
- val driverMemory = getValueAndRemove(properties,
LINKIS_SPARK_DRIVER_MEMORY)
- val driverMemoryWithUnit = if (StringUtils.isNumeric(driverMemory)) {
- driverMemory + "g"
- } else {
- driverMemory
- }
- this.driverMemory(driverMemoryWithUnit)
- this.executorCores(
- getValueAndRemove(properties,
SparkResourceConfiguration.LINKIS_SPARK_EXECUTOR_CORES)
- )
- val executorMemory =
- getValueAndRemove(properties,
SparkResourceConfiguration.LINKIS_SPARK_EXECUTOR_MEMORY)
- val executorMemoryWithUnit = if (StringUtils.isNumeric(executorMemory)) {
- executorMemory + "g"
- } else {
- executorMemory
- }
- this.executorMemory(executorMemoryWithUnit)
- this.numExecutors(
- getValueAndRemove(properties,
SparkResourceConfiguration.LINKIS_SPARK_EXECUTOR_INSTANCES)
- )
- getValueAndRemove(properties, "files",
"").split(",").map(AbsolutePath).foreach(file)
- getValueAndRemove(properties, "jars",
"").split(",").map(AbsolutePath).foreach(jar)
- val defaultExternalJars =
- getValueAndRemove(properties,
SparkConfiguration.SPARK_DEFAULT_EXTERNAL_JARS_PATH)
- defaultExternalJars
- .split(",")
- .map(AbsolutePath)
- .filter(x => {
- val file = new java.io.File(x.path)
- file.isFile
- })
- .foreach(jar)
-
- proxyUser(getValueAndRemove(properties, "proxyUser", ""))
- if (null != darResource) {
- this.queue(darResource.yarnResource.queueName)
- } else {
- this.queue("default")
- }
-
- this.driverClassPath(getValueAndRemove(properties,
SparkConfiguration.SPARK_DRIVER_CLASSPATH))
- this.driverClassPath(variable(CLASSPATH))
- this.redirectOutput(Redirect.PIPE)
- this.redirectErrorStream(true)
-
- val labels = engineRequest.labels.asScala
- labels.foreach { l =>
- {
- this._labels += l
- l match {
- case label: UserCreatorLabel =>
- this._userWithCreator = UserWithCreator(label.getUser,
label.getCreator)
- case _ =>
- }
- }
- }
- if (!HadoopConf.KEYTAB_PROXYUSER_ENABLED.getValue) {
- this.proxyUser(getValueAndRemove(properties, "proxyUser", ""))
- } else {
- this.proxyUser(this._userWithCreator.user)
- }
+ val conf: ArrayBuffer[(String, String)] = ArrayBuffer()
+ conf += (SPARK_DRIVER_EXTRA_JAVA_OPTIONS.key -> driverJavaSet.toString())
// deal spark conf and spark.hadoop.*
+ val properties = engineConnBuildRequest.engineConnCreationDesc.properties
val iterator = properties.entrySet().iterator()
val sparkConfKeys = ArrayBuffer[String]()
while (iterator.hasNext) {
val keyValue = iterator.next()
if (
- !SparkConfiguration.SPARK_PYTHON_VERSION.key.equals(keyValue.getKey)
&& keyValue.getKey
- .startsWith("spark.") && StringUtils.isNotBlank(keyValue.getValue)
+ !SPARK_PYTHON_VERSION.key.equals(keyValue.getKey) &&
+ keyValue.getKey.startsWith("spark.") &&
+ StringUtils.isNotBlank(keyValue.getValue)
) {
- conf(keyValue.getKey, keyValue.getValue)
+ conf += (keyValue.getKey -> keyValue.getValue)
sparkConfKeys += keyValue.getKey
}
}
- this.name(
- getValueAndRemove(
- properties,
- SparkConfiguration.SPARK_APP_NAME
- ) + "_" + this._userWithCreator.creator
- )
sparkConfKeys.foreach(properties.remove(_))
+ conf
}
- private def fromPath(path: Path): String = path match {
- case AbsolutePath(p) => p
- case RelativePath(p) => p
- }
-
- override protected def getEngineConnManagerHooks(implicit
- engineConnBuildRequest: EngineConnBuildRequest
- ): util.List[String] = {
- Lists.newArrayList("JarUDFLoadECMHook")
- }
-
-}
-
-object SparkSubmitProcessEngineConnLaunchBuilder {
-
- def newBuilder(): SparkSubmitProcessEngineConnLaunchBuilder =
- new SparkSubmitProcessEngineConnLaunchBuilder
-
- sealed trait Path {
-
- def isNotBlankPath(): Boolean;
-
- protected def isNotBlankPath(path: String): Boolean = {
- StringUtils.isNotBlank(path) && !"/".equals(path.trim) &&
!"hdfs:///".equals(
- path.trim
- ) && !"file:///".equals(path.trim)
- }
-
- }
-
- case class AbsolutePath(path: String) extends Path {
- override def isNotBlankPath(): Boolean = isNotBlankPath(path)
- }
-
- case class RelativePath(path: String) extends Path {
- override def isNotBlankPath(): Boolean = isNotBlankPath(path)
+ private def isNotBlankPath(path: String): Boolean = {
+ StringUtils.isNotBlank(path) && !"/".equals(path.trim) &&
+ !"hdfs:///".equals(path.trim) && !"file:///".equals(path.trim)
}
- def getValueAndRemove[T](
+ private def getValueAndRemove[T](
properties: java.util.Map[String, String],
commonVars: CommonVars[T]
): T = {
@@ -513,7 +211,7 @@ object SparkSubmitProcessEngineConnLaunchBuilder {
value
}
- def getValueAndRemove(
+ private def getValueAndRemove(
properties: java.util.Map[String, String],
key: String,
defaultValue: String
diff --git
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/utils/SparkJobProgressUtil.scala
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/utils/SparkJobProgressUtil.scala
new file mode 100644
index 000000000..196414420
--- /dev/null
+++
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/utils/SparkJobProgressUtil.scala
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.utils
+
+import org.apache.linkis.common.utils.{JsonUtils, Logging}
+import org.apache.linkis.engineconn.launch.EngineConnServer
+import
org.apache.linkis.engineplugin.spark.config.SparkConfiguration.SPARK_ONCE_YARN_RESTFUL_URL
+import org.apache.linkis.protocol.engine.JobProgressInfo
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.http.client.methods.HttpGet
+import org.apache.http.impl.client.HttpClients
+import org.apache.http.util.EntityUtils
+
+object SparkJobProgressUtil extends Logging {
+
+ def getProgress(applicationId: String): Float = {
+ if (StringUtils.isBlank(applicationId)) return 0f
+ val sparkJobsResult = getSparkJobInfo(applicationId)
+ if (sparkJobsResult.isEmpty) return 0f
+ val tuple = sparkJobsResult
+ .filter(sparkJobResult => {
+ val status = sparkJobResult.getOrDefault("status",
"").asInstanceOf[String]
+ "RUNNING".equals(status) || "SUCCEEDED".equals(status)
+ })
+ .map(result =>
+ (
+ result.get("numTasks").asInstanceOf[Integer],
+ result.get("numCompletedTasks").asInstanceOf[Integer]
+ )
+ )
+ .reduce((total, completed) => (total._1 + completed._1, total._2 +
completed._2))
+ tuple._2.toFloat / tuple._1
+ }
+
+ def getSparkJobProgressInfo(applicationId: String): Array[JobProgressInfo] =
{
+ val sparkJobsResult = getSparkJobInfo(applicationId)
+ if (sparkJobsResult.isEmpty) {
+ Array.empty
+ } else {
+ sparkJobsResult.map(sparkJobResult =>
+ JobProgressInfo(
+ getJobId(
+ sparkJobResult.get("jobId").asInstanceOf[Integer],
+ sparkJobResult.get("jobGroup").asInstanceOf[String]
+ ),
+ sparkJobResult.get("numTasks").asInstanceOf[Integer],
+ sparkJobResult.get("numActiveTasks").asInstanceOf[Integer],
+ sparkJobResult.get("numFailedTasks").asInstanceOf[Integer],
+ sparkJobResult.get("numCompletedTasks").asInstanceOf[Integer]
+ )
+ )
+ }
+ }
+
+ def getSparkJobInfo(applicationId: String): Array[java.util.Map[String,
Object]] =
+ if (StringUtils.isBlank(applicationId)) Array.empty
+ else {
+ val yarnRestfulUrl =
+
SPARK_ONCE_YARN_RESTFUL_URL.getValue(EngineConnServer.getEngineCreationContext.getOptions)
+ val getAppUrl = s"$yarnRestfulUrl/ws/v1/cluster/apps/$applicationId"
+ logger.info(s"get yarn app, url: $getAppUrl")
+ val appResult =
+ JsonUtils.jackson.readValue(get(getAppUrl),
classOf[java.util.Map[String, Object]])
+ val app = appResult.get("app").asInstanceOf[java.util.Map[String,
Object]]
+ if (app == null) return Array.empty
+ val trackingUrl = app.getOrDefault("trackingUrl",
"").asInstanceOf[String]
+ val state = app.getOrDefault("state", "").asInstanceOf[String]
+ if (StringUtils.isBlank(trackingUrl) || "FINISHED".equals(state)) {
+ return Array.empty
+ }
+ val getSparkJobsUrl =
s"${trackingUrl}api/v1/applications/$applicationId/jobs"
+ logger.info(s"get spark jobs, url: $getSparkJobsUrl")
+ val jobs = get(getSparkJobsUrl)
+ if (StringUtils.isBlank(jobs)) {
+ return Array.empty
+ }
+ JsonUtils.jackson.readValue(
+ get(getSparkJobsUrl),
+ classOf[Array[java.util.Map[String, Object]]]
+ )
+ }
+
+ def get(url: String): String = {
+ val httpGet = new HttpGet(url)
+ val client = HttpClients.createDefault
+ val response = client.execute(httpGet)
+ if (response.getStatusLine.getStatusCode == 200) {
+ val str = EntityUtils.toString(response.getEntity, "UTF-8")
+ logger.info(s"url: $url")
+ logger.info(s"response: $str")
+ str
+ } else {
+ ""
+ }
+ }
+
+ private def getJobId(jobId: Int, jobGroup: String): String =
+ "jobId-" + jobId + "(" + jobGroup + ")"
+
+}
diff --git
a/linkis-engineconn-plugins/spark/src/test/scala/org/apache/linkis/engineplugin/spark/launch/TestSparkSubmitProcessEngineConnLaunchBuilder.scala
b/linkis-engineconn-plugins/spark/src/test/scala/org/apache/linkis/engineplugin/spark/launch/TestSparkSubmitProcessEngineConnLaunchBuilder.scala
index 3cb76869b..1c9d45b1e 100644
---
a/linkis-engineconn-plugins/spark/src/test/scala/org/apache/linkis/engineplugin/spark/launch/TestSparkSubmitProcessEngineConnLaunchBuilder.scala
+++
b/linkis-engineconn-plugins/spark/src/test/scala/org/apache/linkis/engineplugin/spark/launch/TestSparkSubmitProcessEngineConnLaunchBuilder.scala
@@ -17,21 +17,13 @@
package org.apache.linkis.engineplugin.spark.launch
-import
org.apache.linkis.engineplugin.spark.launch.SparkSubmitProcessEngineConnLaunchBuilder.RelativePath
-
import org.junit.jupiter.api.{Assertions, Test}
class TestSparkSubmitProcessEngineConnLaunchBuilder {
@Test
def testCreateContext: Unit = {
- val ddd = SparkSubmitProcessEngineConnLaunchBuilder
- .newBuilder()
- .master("local[1]")
- .deployMode("client")
- .className("org.apache.linkis.engineplugin.spark.launch")
- .driverClassPath("")
- .archive(RelativePath(""))
+ val ddd = new SparkEngineConnLaunchBuilder()
Assertions.assertNotNull(ddd)
}
diff --git
a/linkis-engineconn-plugins/sqoop/src/main/scala/org/apache/linkis/engineconnplugin/sqoop/SqoopEngineConnPlugin.scala
b/linkis-engineconn-plugins/sqoop/src/main/scala/org/apache/linkis/engineconnplugin/sqoop/SqoopEngineConnPlugin.scala
index 90fc3acd9..5aea274df 100644
---
a/linkis-engineconn-plugins/sqoop/src/main/scala/org/apache/linkis/engineconnplugin/sqoop/SqoopEngineConnPlugin.scala
+++
b/linkis-engineconn-plugins/sqoop/src/main/scala/org/apache/linkis/engineconnplugin/sqoop/SqoopEngineConnPlugin.scala
@@ -46,12 +46,7 @@ class SqoopEngineConnPlugin extends EngineConnPlugin {
}
override def getEngineConnLaunchBuilder: EngineConnLaunchBuilder = {
- EP_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
- if (null == engineConnLaunchBuilder) {
- engineConnLaunchBuilder = new SqoopEngineConnLaunchBuilder()
- }
- engineConnLaunchBuilder
- }
+ new SqoopEngineConnLaunchBuilder()
}
override def getEngineConnFactory: EngineConnFactory = {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]