This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch dev-1.3.2
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git


The following commit(s) were added to refs/heads/dev-1.3.2 by this push:
     new e124d91a6 [Feature][linkis-engineplugin-spark] spark submit jar 
(engineConnMode is once) (#3966)
e124d91a6 is described below

commit e124d91a652efd6a77aa2cff952bbacef6875e6d
Author: rarexixi <[email protected]>
AuthorDate: Tue Dec 20 11:37:45 2022 +0800

    [Feature][linkis-engineplugin-spark] spark submit jar (engineConnMode is 
once) (#3966)
    
    * [Feature][linkis-engineplugin-spark] spark submit jar (engineConnMode is 
once)
---
 .../once/action/CreateEngineConnAction.scala       |   2 +-
 .../computation/client/SparkOnceJobTest.java       | 127 +++++
 .../engineconn/common/conf/EngineConnConf.scala    |   5 +
 .../engineconn/common/hook/EngineConnHook.scala    |  36 +-
 .../engineconn/launch/EngineConnServer.scala       |  32 +-
 .../common/launch/EngineConnLaunchBuilder.scala    |   7 +-
 .../process/ProcessEngineConnLaunchBuilder.scala   |   4 +-
 .../service/DefaultEngineConnLaunchService.scala   |  24 +-
 .../label/entity/engine/EngineConnModeLabel.scala  |   5 +
 .../linkis/manager/label/utils/LabelUtil.scala     |  15 +
 .../openlookeng/OpenLooKengECPlugin.scala          |  10 +-
 .../seatunnel/SeatunnelEngineConnPlugin.scala      |   7 +-
 linkis-engineconn-plugins/spark/pom.xml            |  12 +
 .../spark/client/context/ExecutionContext.java     |  17 +-
 .../spark/client/context/SparkConfig.java          | 370 +++++++++++++++
 .../deployment/ClusterDescriptorAdapter.java       |  98 ++++
 .../ClusterDescriptorAdapterFactory.java           |  16 +-
 .../YarnApplicationClusterDescriptorAdapter.java   | 122 +++++
 .../spark/errorcode/SparkErrorCodeSummary.java     |   7 +
 .../spark/exception/ExecutorInitException.java     |  15 +-
 .../spark/exception/JobExecutionException.java     |  15 +-
 .../main/resources/linkis-engineconn.properties    |   2 +
 .../engineplugin/spark/SparkEngineConnPlugin.scala |   6 +-
 .../spark/config/SparkConfiguration.scala          |  18 +-
 .../spark/context/EnvironmentContext.scala         |  65 +++
 .../spark/context/SparkEngineConnContext.scala     |  18 +-
 .../spark/executor/SparkExecutor.scala             |  64 +++
 .../spark/executor/SparkOnceExecutor.scala         | 158 +++++++
 .../spark/executor/SparkSubmitOnceExecutor.scala   | 145 ++++++
 .../spark/factory/SparkEngineConnFactory.scala     |  79 +++-
 .../spark/factory/SparkOnceExecutorFactory.scala   |  44 ++
 .../launch/SparkEngineConnLaunchBuilder.scala      | 171 +++++++
 ...SparkSubmitProcessEngineConnLaunchBuilder.scala | 522 +++++----------------
 .../spark/utils/SparkJobProgressUtil.scala         | 116 +++++
 ...SparkSubmitProcessEngineConnLaunchBuilder.scala |  10 +-
 .../sqoop/SqoopEngineConnPlugin.scala              |   7 +-
 36 files changed, 1839 insertions(+), 532 deletions(-)

diff --git 
a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/action/CreateEngineConnAction.scala
 
b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/action/CreateEngineConnAction.scala
index 403644e4d..f5210a1b8 100644
--- 
a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/action/CreateEngineConnAction.scala
+++ 
b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/action/CreateEngineConnAction.scala
@@ -89,7 +89,7 @@ object CreateEngineConnAction {
       action.addRequestPayload("properties", properties)
       action.addRequestPayload("labels", labels)
       action.addRequestPayload("createService", createService)
-      action.addRequestPayload("timeOut", maxSubmitTime)
+      action.addRequestPayload("timeout", maxSubmitTime)
       action.addRequestPayload("description", description)
       action.addRequestPayload("ignoreTimeout", ignoreTimeout)
       action
diff --git 
a/linkis-computation-governance/linkis-client/linkis-computation-client/src/test/java/org/apache/linkis/computation/client/SparkOnceJobTest.java
 
b/linkis-computation-governance/linkis-client/linkis-computation-client/src/test/java/org/apache/linkis/computation/client/SparkOnceJobTest.java
new file mode 100644
index 000000000..963c8569f
--- /dev/null
+++ 
b/linkis-computation-governance/linkis-client/linkis-computation-client/src/test/java/org/apache/linkis/computation/client/SparkOnceJobTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.computation.client;
+
+import 
org.apache.linkis.computation.client.once.simple.SubmittableSimpleOnceJob;
+import org.apache.linkis.computation.client.operator.impl.*;
+
+import java.util.ArrayList;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SparkOnceJobTest {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(SparkOnceJobTest.class);
+
+  public static void main(String[] args) throws InterruptedException {
+
+    LinkisJobClient.config().setDefaultServerUrl("http://127.0.0.1:9001";);
+
+    String submitUser = "linkis";
+    String engineType = "spark";
+
+    SubmittableSimpleOnceJob onceJob =
+        // region
+        LinkisJobClient.once()
+            .simple()
+            .builder()
+            .setCreateService("Spark-Test")
+            .setMaxSubmitTime(300000)
+            .setDescription("SparkTestDescription")
+            .addExecuteUser(submitUser)
+            .addJobContent("runType", "jar")
+            .addJobContent("spark.app.main.class", 
"org.apache.spark.examples.JavaWordCount")
+            .addJobContent("spark.app.args", "hdfs:///tmp/log.log -a 10 -b=12")
+            .addJobContent(
+                "spark.extconf", 
"spark.a=d\nspark.c=d\nspark.args.start_date=2022-06-14")
+            .addLabel("engineType", engineType + "-2.4.7")
+            .addLabel("userCreator", submitUser + "-IDE")
+            .addLabel("engineConnMode", "once")
+            .addStartupParam("spark.app.name", "spark-submit-jar-test-xi")
+            .addStartupParam("spark.executor.memory", "1g")
+            .addStartupParam("spark.driver.memory", "1g")
+            .addStartupParam("spark.executor.cores", "1")
+            .addStartupParam("spark.executor.instance", "1")
+            .addStartupParam(
+                "spark.app.resource", 
"hdfs:///spark/spark-examples_2.11-2.3.0.2.6.5.0-292.jar")
+            .addSource("jobName", "OnceJobTest")
+            .build();
+    // endregion
+    onceJob.submit();
+    logger.info("jobId: {}", onceJob.getId());
+
+    EngineConnLogOperator logOperator =
+        (EngineConnLogOperator) 
onceJob.getOperator(EngineConnLogOperator.OPERATOR_NAME());
+    int fromLine = 1;
+    int logSize = 1000; // log lines
+    logOperator.setPageSize(logSize);
+    logOperator.setEngineConnType("spark");
+
+    ArrayList<String> logLines;
+
+    // print log
+    while (true) {
+      try {
+        logOperator.setFromLine(fromLine);
+        EngineConnLogs logs = (EngineConnLogs) logOperator.apply();
+        logLines = logs.logs();
+        if (logLines == null || logLines.isEmpty()) {
+          if (!isCompleted(onceJob, 3)) {
+            Thread.sleep(2000);
+            continue;
+          } else {
+            break;
+          }
+        }
+        for (String log : logLines) {
+          System.out.println(log);
+        }
+        fromLine += logLines.size();
+      } catch (Exception e) {
+        logger.error("Failed to get log information", e);
+        break;
+      }
+    }
+
+    boolean complete = false;
+    // wait complete
+    while (!complete) {
+      try {
+        complete = onceJob.isCompleted();
+      } catch (Exception e) {
+        logger.error("isCompleted error", e);
+      }
+    }
+
+    String finalStatus = onceJob.getStatus();
+    logger.info("final status " + finalStatus);
+  }
+
+  static boolean isCompleted(SubmittableSimpleOnceJob onceJob, int times)
+      throws InterruptedException {
+    if (times == 0) return false;
+    try {
+      return onceJob.isCompleted();
+    } catch (Exception e) {
+      logger.error("isCompleted error", e);
+      Thread.sleep(2000);
+      return isCompleted(onceJob, times - 1);
+    }
+  }
+}
diff --git 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-common/src/main/scala/org/apache/linkis/engineconn/common/conf/EngineConnConf.scala
 
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-common/src/main/scala/org/apache/linkis/engineconn/common/conf/EngineConnConf.scala
index cbaa23376..1518f00a6 100644
--- 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-common/src/main/scala/org/apache/linkis/engineconn/common/conf/EngineConnConf.scala
+++ 
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-common/src/main/scala/org/apache/linkis/engineconn/common/conf/EngineConnConf.scala
@@ -35,6 +35,11 @@ object EngineConnConf {
     
"org.apache.linkis.engineconn.computation.executor.hook.ComputationEngineConnHook"
   )
 
+  val ENGINE_CONN_ONCE_HOOKS = CommonVars(
+    "linkis.engine.connector.once.hooks",
+    
"org.apache.linkis.engineconn.computation.executor.hook.ComputationEngineConnHook"
+  )
+
   val ENGINE_LAUNCH_CMD_PARAMS_USER_KEY =
     CommonVars("wds.linkis.engine.launch.cmd.params.user.key", "user")
 
diff --git 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-common/src/main/scala/org/apache/linkis/engineconn/common/hook/EngineConnHook.scala
 
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-common/src/main/scala/org/apache/linkis/engineconn/common/hook/EngineConnHook.scala
index 5a73678a1..f60bf6d65 100644
--- 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-common/src/main/scala/org/apache/linkis/engineconn/common/hook/EngineConnHook.scala
+++ 
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-common/src/main/scala/org/apache/linkis/engineconn/common/hook/EngineConnHook.scala
@@ -52,27 +52,33 @@ trait EngineConnHook {
 
 object EngineConnHook extends Logging {
 
-  private val engineConnHooks = initEngineConnHooks
+  private var engineConnHooks: Array[EngineConnHook] = _
 
-  private def initEngineConnHooks: Array[EngineConnHook] = {
+  private def initEngineConnHooks(isOnceMode: Boolean): Unit = {
 
-    val hooks = EngineConnConf.ENGINE_CONN_HOOKS.getValue
+    val hooks = if (isOnceMode) {
+      EngineConnConf.ENGINE_CONN_ONCE_HOOKS.getValue
+    } else {
+      EngineConnConf.ENGINE_CONN_HOOKS.getValue
+    }
     if (StringUtils.isNotBlank(hooks)) {
-      val clazzArr = hooks.split(",")
-      if (null != clazzArr && clazzArr.nonEmpty) {
-        clazzArr
-          .map { clazz =>
-            Utils.tryAndWarn(Utils.getClassInstance[EngineConnHook](clazz))
-          }
-          .filter(_ != null)
-      } else {
-        Array.empty
-      }
+      engineConnHooks = hooks
+        .split(",")
+        .map(_.trim)
+        .filter(StringUtils.isNotBlank)
+        .map(Utils.tryAndWarn(Utils.getClassInstance[EngineConnHook](_)))
+        .filter(_ != null)
     } else {
-      Array.empty
+      engineConnHooks = Array.empty
     }
 
   }
 
-  def getEngineConnHooks: Array[EngineConnHook] = engineConnHooks
+  def getEngineConnHooks(isOnceMode: Boolean = false): Array[EngineConnHook] = 
{
+    if (engineConnHooks == null) {
+      initEngineConnHooks(isOnceMode)
+    }
+    engineConnHooks
+  }
+
 }
diff --git 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-core/src/main/scala/org/apache/linkis/engineconn/launch/EngineConnServer.scala
 
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-core/src/main/scala/org/apache/linkis/engineconn/launch/EngineConnServer.scala
index 88f9112b3..92bd3cd75 100644
--- 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-core/src/main/scala/org/apache/linkis/engineconn/launch/EngineConnServer.scala
+++ 
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-core/src/main/scala/org/apache/linkis/engineconn/launch/EngineConnServer.scala
@@ -46,6 +46,7 @@ import org.apache.linkis.manager.label.builder.factory.{
   LabelBuilderFactoryContext
 }
 import org.apache.linkis.manager.label.entity.Label
+import org.apache.linkis.manager.label.entity.engine.EngineConnMode
 
 import org.apache.commons.lang3.exception.ExceptionUtils
 
@@ -59,6 +60,8 @@ object EngineConnServer extends Logging {
   private val labelBuilderFactory: LabelBuilderFactory =
     LabelBuilderFactoryContext.getLabelBuilderFactory
 
+  private var onceMode: Boolean = false
+
   def main(args: Array[String]): Unit = {
     logger.info("<<---------------------EngineConnServer Start 
--------------------->>")
 
@@ -76,14 +79,13 @@ object EngineConnServer extends Logging {
         "Finished to create EngineCreationContext, EngineCreationContext 
content: " + EngineConnUtils.GSON
           .toJson(engineCreationContext)
       )
-      
EngineConnHook.getEngineConnHooks.foreach(_.beforeCreateEngineConn(getEngineCreationContext))
+      val ecHooks = EngineConnHook.getEngineConnHooks(onceMode)
+      ecHooks.foreach(_.beforeCreateEngineConn(getEngineCreationContext))
       logger.info("Finished to execute hook of beforeCreateEngineConn.")
       // 2. cresate EngineConn
       val engineConn = 
getEngineConnManager.createEngineConn(getEngineCreationContext)
       logger.info(s"Finished to create ${engineConn.getEngineConnType} 
EngineConn.")
-      EngineConnHook.getEngineConnHooks.foreach(
-        _.beforeExecutionExecute(getEngineCreationContext, engineConn)
-      )
+      ecHooks.foreach(_.beforeExecutionExecute(getEngineCreationContext, 
engineConn))
       logger.info("Finished to execute all hooks of beforeExecutionExecute.")
       // 3. register executions
       Utils.tryThrow(executeEngineConn(engineConn)) { t =>
@@ -96,19 +98,14 @@ object EngineConnServer extends Logging {
       }
       EngineConnObject.setReady()
       logger.info("Finished to execute executions.")
-      EngineConnHook.getEngineConnHooks.foreach(
-        _.afterExecutionExecute(getEngineCreationContext, engineConn)
-      )
+      ecHooks.foreach(_.afterExecutionExecute(getEngineCreationContext, 
engineConn))
       logger.info("Finished to execute hook of afterExecutionExecute")
-      EngineConnHook.getEngineConnHooks.foreach(
-        _.afterEngineServerStartSuccess(getEngineCreationContext, engineConn)
-      )
+      
ecHooks.foreach(_.afterEngineServerStartSuccess(getEngineCreationContext, 
engineConn))
     } catch {
       case t: Throwable =>
         logger.error("EngineConnServer Start Failed.", t)
-        EngineConnHook.getEngineConnHooks.foreach(
-          _.afterEngineServerStartFailed(getEngineCreationContext, t)
-        )
+        val ecHooks = EngineConnHook.getEngineConnHooks(onceMode)
+        
ecHooks.foreach(_.afterEngineServerStartFailed(getEngineCreationContext, t))
         System.exit(1)
     }
 
@@ -132,8 +129,11 @@ object EngineConnServer extends Logging {
     val labelArgs = 
engineConf.filter(_._1.startsWith(EngineConnArgumentsParser.LABEL_PREFIX))
     if (labelArgs.nonEmpty) {
       labelArgs.foreach { case (key, value) =>
-        labels += labelBuilderFactory
-          
.createLabel[Label[_]](key.replace(EngineConnArgumentsParser.LABEL_PREFIX, ""), 
value)
+        val realKey = 
key.substring(EngineConnArgumentsParser.LABEL_PREFIX.length)
+        if ("engineConnMode".equals(realKey)) {
+          onceMode = EngineConnMode.isOnceMode(value)
+        }
+        labels += labelBuilderFactory.createLabel[Label[_]](realKey, value)
       }
       engineCreationContext.setLabels(labels.toList.asJava)
     }
@@ -160,6 +160,8 @@ object EngineConnServer extends Logging {
     }
   }
 
+  def isOnceMode: Boolean = this.onceMode
+
   def getEngineCreationContext: EngineCreationContext = 
this.engineCreationContext
 
   private def getEngineConnManager: EngineConnManager = 
EngineConnManager.getEngineConnManager
diff --git 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/launch/EngineConnLaunchBuilder.scala
 
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/launch/EngineConnLaunchBuilder.scala
index c0e919863..11c445447 100644
--- 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/launch/EngineConnLaunchBuilder.scala
+++ 
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/launch/EngineConnLaunchBuilder.scala
@@ -24,6 +24,11 @@ import 
org.apache.linkis.manager.engineplugin.common.launch.entity.{
 
 trait EngineConnLaunchBuilder {
 
-  def buildEngineConn(engineConnBuildRequest: EngineConnBuildRequest): 
EngineConnLaunchRequest
+  protected var engineConnBuildRequest: EngineConnBuildRequest = _
+
+  def setBuildRequest(engineConnBuildRequest: EngineConnBuildRequest): Unit =
+    this.engineConnBuildRequest = engineConnBuildRequest
+
+  def buildEngineConn(): EngineConnLaunchRequest
 
 }
diff --git 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/launch/process/ProcessEngineConnLaunchBuilder.scala
 
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/launch/process/ProcessEngineConnLaunchBuilder.scala
index 1a5905ac1..ae35b18df 100644
--- 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/launch/process/ProcessEngineConnLaunchBuilder.scala
+++ 
b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/launch/process/ProcessEngineConnLaunchBuilder.scala
@@ -52,9 +52,7 @@ trait ProcessEngineConnLaunchBuilder extends 
EngineConnLaunchBuilder {
       engineConnBuildRequest: EngineConnBuildRequest
   ): util.List[String] = new util.ArrayList[String]
 
-  override def buildEngineConn(
-      engineConnBuildRequest: EngineConnBuildRequest
-  ): EngineConnLaunchRequest = {
+  override def buildEngineConn(): EngineConnLaunchRequest = {
     implicit val engineConnBuildRequestImplicit: EngineConnBuildRequest = 
engineConnBuildRequest
     val bmlResources = new util.ArrayList[BmlResource]
     engineConnBuildRequest match {
diff --git 
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/engineplugin/server/service/DefaultEngineConnLaunchService.scala
 
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/engineplugin/server/service/DefaultEngineConnLaunchService.scala
index 52d830d32..cb3b03fa5 100644
--- 
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/engineplugin/server/service/DefaultEngineConnLaunchService.scala
+++ 
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/engineplugin/server/service/DefaultEngineConnLaunchService.scala
@@ -46,7 +46,10 @@ class DefaultEngineConnLaunchService extends 
EngineConnLaunchService with Loggin
   @Autowired
   private var engineConnResourceGenerator: EngineConnResourceGenerator = _
 
-  private def getEngineLaunchBuilder(engineTypeLabel: EngineTypeLabel): 
EngineConnLaunchBuilder = {
+  private def getEngineLaunchBuilder(
+      engineTypeLabel: EngineTypeLabel,
+      engineBuildRequest: EngineConnBuildRequest
+  ): EngineConnLaunchBuilder = {
     val engineConnPluginInstance =
       
EngineConnPluginsLoader.getEngineConnPluginsLoader().getEngineConnPlugin(engineTypeLabel)
     val builder = engineConnPluginInstance.plugin.getEngineConnLaunchBuilder
@@ -56,6 +59,7 @@ class DefaultEngineConnLaunchService extends 
EngineConnLaunchService with Loggin
           engineConnResourceGenerator
         )
     }
+    builder.setBuildRequest(engineBuildRequest)
     builder
   }
 
@@ -66,16 +70,14 @@ class DefaultEngineConnLaunchService extends 
EngineConnLaunchService with Loggin
     val engineTypeOption = 
engineBuildRequest.labels.asScala.find(_.isInstanceOf[EngineTypeLabel])
     if (engineTypeOption.isDefined) {
       val engineTypeLabel = engineTypeOption.get.asInstanceOf[EngineTypeLabel]
-      
Utils.tryCatch(getEngineLaunchBuilder(engineTypeLabel).buildEngineConn(engineBuildRequest))
 {
-        t =>
-          logger.error(
-            s"Failed to 
createEngineConnLaunchRequest(${engineBuildRequest.ticketId})",
-            t
-          )
-          throw new EngineConnPluginErrorException(
-            FAILED_CREATE_ELR.getErrorCode,
-            s"${FAILED_CREATE_ELR.getErrorDesc}, 
${ExceptionUtils.getRootCauseMessage(t)}"
-          )
+      Utils.tryCatch(
+        getEngineLaunchBuilder(engineTypeLabel, 
engineBuildRequest).buildEngineConn()
+      ) { t =>
+        logger.error(s"Failed to 
createEngineConnLaunchRequest(${engineBuildRequest.ticketId})", t)
+        throw new EngineConnPluginErrorException(
+          FAILED_CREATE_ELR.getErrorCode,
+          s"${FAILED_CREATE_ELR.getErrorDesc}, 
${ExceptionUtils.getRootCauseMessage(t)}"
+        )
       }
     } else {
       throw new EngineConnPluginErrorException(
diff --git 
a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/EngineConnModeLabel.scala
 
b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/EngineConnModeLabel.scala
index 7c53cb6f0..b1880bc8e 100644
--- 
a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/EngineConnModeLabel.scala
+++ 
b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/EngineConnModeLabel.scala
@@ -60,4 +60,9 @@ object EngineConnMode extends Enumeration {
     case _ => Unknown
   }
 
+  val ONCE_MODES = Set(Once, Computation_With_Once, Once_With_Cluster)
+
+  def isOnceMode(ecMode: String): Boolean = {
+    ONCE_MODES.contains(toEngineConnMode(ecMode))
+  }
 }
\ No newline at end of file
diff --git 
a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/scala/org/apache/linkis/manager/label/utils/LabelUtil.scala
 
b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/scala/org/apache/linkis/manager/label/utils/LabelUtil.scala
index 7a3627d3c..b4a66d2f4 100644
--- 
a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/scala/org/apache/linkis/manager/label/utils/LabelUtil.scala
+++ 
b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/scala/org/apache/linkis/manager/label/utils/LabelUtil.scala
@@ -20,6 +20,7 @@ package org.apache.linkis.manager.label.utils
 import org.apache.linkis.manager.label.entity.Label
 import org.apache.linkis.manager.label.entity.engine.{
   CodeLanguageLabel,
+  EngineConnModeLabel,
   EngineTypeLabel,
   UserCreatorLabel
 }
@@ -79,6 +80,20 @@ object LabelUtil {
     getLabelFromList[CodeLanguageLabel](labels)
   }
 
+  def getEngineConnModeLabel(labels: util.List[Label[_]]): EngineConnModeLabel 
= {
+    getLabelFromList[EngineConnModeLabel](labels)
+  }
+
+  def getEngineConnMode(labels: util.List[Label[_]]): String = {
+    if (null == labels) return null
+    val engineConnModeLabel = getEngineConnModeLabel(labels)
+    if (null != engineConnModeLabel) {
+      engineConnModeLabel.getEngineConnMode
+    } else {
+      null
+    }
+  }
+
   def getBindEngineLabel(labels: util.List[Label[_]]): BindEngineLabel = {
     getLabelFromList[BindEngineLabel](labels)
   }
diff --git 
a/linkis-engineconn-plugins/openlookeng/src/main/scala/org/apache/linkis/engineplugin/openlookeng/OpenLooKengECPlugin.scala
 
b/linkis-engineconn-plugins/openlookeng/src/main/scala/org/apache/linkis/engineplugin/openlookeng/OpenLooKengECPlugin.scala
index 48e1b2b9a..da827e8b8 100644
--- 
a/linkis-engineconn-plugins/openlookeng/src/main/scala/org/apache/linkis/engineplugin/openlookeng/OpenLooKengECPlugin.scala
+++ 
b/linkis-engineconn-plugins/openlookeng/src/main/scala/org/apache/linkis/engineplugin/openlookeng/OpenLooKengECPlugin.scala
@@ -55,25 +55,25 @@ class OpenLooKengECPlugin extends EngineConnPlugin {
     this.defaultLabels.add(engineTypeLabel)
   }
 
-  override def getEngineResourceFactory(): EngineResourceFactory = {
+  override def getEngineResourceFactory: EngineResourceFactory = {
     if (null == engineResourceFactory) resourceLocker synchronized {
       engineResourceFactory = new GenericEngineResourceFactory
     }
     engineResourceFactory
   }
 
-  override def getEngineConnLaunchBuilder(): EngineConnLaunchBuilder = {
-    new OpenLooKengProcessECLaunchBuilder;
+  override def getEngineConnLaunchBuilder: EngineConnLaunchBuilder = {
+    new OpenLooKengProcessECLaunchBuilder
   }
 
-  override def getEngineConnFactory(): EngineConnFactory = {
+  override def getEngineConnFactory: EngineConnFactory = {
     if (null == engineFactory) engineFactoryLocker synchronized {
       engineFactory = new OpenLooKengEngineConnFactory
     }
     engineFactory
   }
 
-  override def getDefaultLabels(): util.List[Label[_]] = {
+  override def getDefaultLabels: util.List[Label[_]] = {
     this.defaultLabels
   }
 
diff --git 
a/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/SeatunnelEngineConnPlugin.scala
 
b/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/SeatunnelEngineConnPlugin.scala
index 21ddcffb5..3736f0ef0 100644
--- 
a/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/SeatunnelEngineConnPlugin.scala
+++ 
b/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/SeatunnelEngineConnPlugin.scala
@@ -51,12 +51,7 @@ class SeatunnelEngineConnPlugin extends EngineConnPlugin {
   }
 
   override def getEngineConnLaunchBuilder: EngineConnLaunchBuilder = {
-    engineLaunchBuilderLocker.synchronized {
-      if (null == engineConnLaunchBuilder) {
-        engineConnLaunchBuilder = new SeatunnelEngineConnLaunchBuilder()
-      }
-      engineConnLaunchBuilder
-    }
+    new SeatunnelEngineConnLaunchBuilder()
   }
 
   override def getEngineConnFactory: EngineConnFactory = {
diff --git a/linkis-engineconn-plugins/spark/pom.xml 
b/linkis-engineconn-plugins/spark/pom.xml
index 6966441de..2bb404ed8 100644
--- a/linkis-engineconn-plugins/spark/pom.xml
+++ b/linkis-engineconn-plugins/spark/pom.xml
@@ -63,6 +63,18 @@
       </exclusions>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.linkis</groupId>
+      <artifactId>linkis-once-engineconn</artifactId>
+      <version>${project.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>commons-logging</groupId>
+          <artifactId>commons-logging</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
     <dependency>
       <groupId>org.apache.linkis</groupId>
       <artifactId>linkis-storage</artifactId>
diff --git 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/launch/EngineConnLaunchBuilder.scala
 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/context/ExecutionContext.java
similarity index 72%
copy from 
linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/launch/EngineConnLaunchBuilder.scala
copy to 
linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/context/ExecutionContext.java
index c0e919863..d6d62781c 100644
--- 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/launch/EngineConnLaunchBuilder.scala
+++ 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/context/ExecutionContext.java
@@ -15,15 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.linkis.manager.engineplugin.common.launch
+package org.apache.linkis.engineplugin.spark.client.context;
 
-import org.apache.linkis.manager.engineplugin.common.launch.entity.{
-  EngineConnBuildRequest,
-  EngineConnLaunchRequest
-}
-
-trait EngineConnLaunchBuilder {
+public class ExecutionContext {
+  private SparkConfig sparkConfig;
 
-  def buildEngineConn(engineConnBuildRequest: EngineConnBuildRequest): 
EngineConnLaunchRequest
+  public SparkConfig getSparkConfig() {
+    return sparkConfig;
+  }
 
+  public void setSparkConfig(SparkConfig sparkConfig) {
+    this.sparkConfig = sparkConfig;
+  }
 }
diff --git 
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/context/SparkConfig.java
 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/context/SparkConfig.java
new file mode 100644
index 000000000..c8f784222
--- /dev/null
+++ 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/context/SparkConfig.java
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.client.context;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class SparkConfig {
+
+  private String javaHome; // ("")
+  private String sparkHome; // ("")
+  private String master = "yarn"; // ("yarn")
+  private String deployMode = "client"; // ("client") // todo cluster
+  private String appResource; // ("")
+  // private String mainClass;                            // ("")
+  private String appName; // ("")
+  private String jars; // ("--jars", "")
+  private String packages; // ("--packages", "")
+  private String excludePackages; // ("--exclude-packages", "")
+  private String repositories; // ("--repositories", "")
+  private String files; // ("--files", "")
+  private String archives; // ("--archives", "")
+  private Map<String, String> conf = new HashMap<>(); // ("", "")
+  private String propertiesFile; // ("")
+  private String driverMemory; // ("--driver-memory", "")
+  private String driverJavaOptions; // ("--driver-java-options", "")
+  private String driverLibraryPath; // ("--driver-library-path", "")
+  private String driverClassPath; // ("--driver-class-path", "")
+  private String executorMemory; // ("--executor-memory", "")
+  private String proxyUser; // ("--proxy-user", "")
+  private boolean verbose = false; // (false)
+  private Integer driverCores; // ("--driver-cores", "") // Cluster deploy 
mode only
+  private String totalExecutorCores; // ("--total-executor-cores", "")
+  private Integer executorCores; // ("--executor-cores", "")
+  private Integer numExecutors; // ("--num-executors", "")
+  private String principal; // ("--principal", "")
+  private String keytab; // ("--keytab", "")
+  private String queue; // ("--queue", "")
+
+  public String getJavaHome() {
+    return javaHome;
+  }
+
+  public void setJavaHome(String javaHome) {
+    this.javaHome = javaHome;
+  }
+
+  public String getSparkHome() {
+    return sparkHome;
+  }
+
+  public void setSparkHome(String sparkHome) {
+    this.sparkHome = sparkHome;
+  }
+
+  public String getMaster() {
+    return master;
+  }
+
+  public void setMaster(String master) {
+    this.master = master;
+  }
+
+  public String getDeployMode() {
+    return deployMode;
+  }
+
+  public void setDeployMode(String deployMode) {
+    this.deployMode = deployMode;
+  }
+
+  public String getAppResource() {
+    return appResource;
+  }
+
+  public void setAppResource(String appResource) {
+    this.appResource = appResource;
+  }
+
+  //    public String getMainClass() {
+  //        return mainClass;
+  //    }
+  //
+  //    public void setMainClass(String mainClass) {
+  //        this.mainClass = mainClass;
+  //    }
+
+  public String getAppName() {
+    return appName;
+  }
+
+  public void setAppName(String appName) {
+    this.appName = appName;
+  }
+
+  public String getJars() {
+    return jars;
+  }
+
+  public void setJars(String jars) {
+    this.jars = jars;
+  }
+
+  public String getPackages() {
+    return packages;
+  }
+
+  public void setPackages(String packages) {
+    this.packages = packages;
+  }
+
+  public String getExcludePackages() {
+    return excludePackages;
+  }
+
+  public void setExcludePackages(String excludePackages) {
+    this.excludePackages = excludePackages;
+  }
+
+  public String getRepositories() {
+    return repositories;
+  }
+
+  public void setRepositories(String repositories) {
+    this.repositories = repositories;
+  }
+
+  public String getFiles() {
+    return files;
+  }
+
+  public void setFiles(String files) {
+    this.files = files;
+  }
+
+  public String getArchives() {
+    return archives;
+  }
+
+  public void setArchives(String archives) {
+    this.archives = archives;
+  }
+
+  public Map<String, String> getConf() {
+    return conf;
+  }
+
+  public void setConf(Map<String, String> conf) {
+    this.conf = conf;
+  }
+
+  public String getPropertiesFile() {
+    return propertiesFile;
+  }
+
+  public void setPropertiesFile(String propertiesFile) {
+    this.propertiesFile = propertiesFile;
+  }
+
+  public String getDriverMemory() {
+    return driverMemory;
+  }
+
+  public void setDriverMemory(String driverMemory) {
+    this.driverMemory = driverMemory;
+  }
+
+  public String getDriverJavaOptions() {
+    return driverJavaOptions;
+  }
+
+  public void setDriverJavaOptions(String driverJavaOptions) {
+    this.driverJavaOptions = driverJavaOptions;
+  }
+
+  public String getDriverLibraryPath() {
+    return driverLibraryPath;
+  }
+
+  public void setDriverLibraryPath(String driverLibraryPath) {
+    this.driverLibraryPath = driverLibraryPath;
+  }
+
+  public String getDriverClassPath() {
+    return driverClassPath;
+  }
+
+  public void setDriverClassPath(String driverClassPath) {
+    this.driverClassPath = driverClassPath;
+  }
+
+  public String getExecutorMemory() {
+    return executorMemory;
+  }
+
+  public void setExecutorMemory(String executorMemory) {
+    this.executorMemory = executorMemory;
+  }
+
+  public String getProxyUser() {
+    return proxyUser;
+  }
+
+  public void setProxyUser(String proxyUser) {
+    this.proxyUser = proxyUser;
+  }
+
+  public boolean isVerbose() {
+    return verbose;
+  }
+
+  public void setVerbose(boolean verbose) {
+    this.verbose = verbose;
+  }
+
+  public Integer getDriverCores() {
+    return driverCores;
+  }
+
+  public void setDriverCores(Integer driverCores) {
+    this.driverCores = driverCores;
+  }
+
+  public String getTotalExecutorCores() {
+    return totalExecutorCores;
+  }
+
+  public void setTotalExecutorCores(String totalExecutorCores) {
+    this.totalExecutorCores = totalExecutorCores;
+  }
+
+  public Integer getExecutorCores() {
+    return executorCores;
+  }
+
+  public void setExecutorCores(Integer executorCores) {
+    this.executorCores = executorCores;
+  }
+
+  public Integer getNumExecutors() {
+    return numExecutors;
+  }
+
+  public void setNumExecutors(Integer numExecutors) {
+    this.numExecutors = numExecutors;
+  }
+
+  public String getPrincipal() {
+    return principal;
+  }
+
+  public void setPrincipal(String principal) {
+    this.principal = principal;
+  }
+
+  public String getKeytab() {
+    return keytab;
+  }
+
+  public void setKeytab(String keytab) {
+    this.keytab = keytab;
+  }
+
+  public String getQueue() {
+    return queue;
+  }
+
+  public void setQueue(String queue) {
+    this.queue = queue;
+  }
+
+  @Override
+  public String toString() {
+    return "SparkConfig{"
+        + "javaHome='"
+        + javaHome
+        + '\''
+        + ", sparkHome='"
+        + sparkHome
+        + '\''
+        + ", master='"
+        + master
+        + '\''
+        + ", deployMode='"
+        + deployMode
+        + '\''
+        +
+        //                ", mainClass='" + mainClass + '\'' +
+        ", appName='"
+        + appName
+        + '\''
+        + ", jars='"
+        + jars
+        + '\''
+        + ", packages='"
+        + packages
+        + '\''
+        + ", excludePackages='"
+        + excludePackages
+        + '\''
+        + ", repositories='"
+        + repositories
+        + '\''
+        + ", files='"
+        + files
+        + '\''
+        + ", archives='"
+        + archives
+        + '\''
+        + ", conf="
+        + conf
+        + ", propertiesFile='"
+        + propertiesFile
+        + '\''
+        + ", driverMemory='"
+        + driverMemory
+        + '\''
+        + ", driverJavaOptions='"
+        + driverJavaOptions
+        + '\''
+        + ", driverLibraryPath='"
+        + driverLibraryPath
+        + '\''
+        + ", driverClassPath='"
+        + driverClassPath
+        + '\''
+        + ", executorMemory='"
+        + executorMemory
+        + '\''
+        + ", proxyUser='"
+        + proxyUser
+        + '\''
+        + ", verbose="
+        + verbose
+        + ", driverCores="
+        + driverCores
+        + ", totalExecutorCores='"
+        + totalExecutorCores
+        + '\''
+        + ", executorCores="
+        + executorCores
+        + ", numExecutors="
+        + numExecutors
+        + ", principal='"
+        + principal
+        + '\''
+        + ", keytab='"
+        + keytab
+        + '\''
+        + ", queue='"
+        + queue
+        + '\''
+        + '}';
+  }
+}
diff --git 
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapter.java
 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapter.java
new file mode 100644
index 000000000..4607f2e4b
--- /dev/null
+++ 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapter.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.client.deployment;
+
+import org.apache.linkis.engineplugin.spark.client.context.ExecutionContext;
+
+import org.apache.spark.launcher.SparkAppHandle;
+import org.apache.spark.launcher.SparkLauncher;
+
+import java.io.Closeable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class ClusterDescriptorAdapter implements Closeable {
+  protected static final Logger logger = 
LoggerFactory.getLogger(ClusterDescriptorAdapter.class);
+
+  protected final ExecutionContext executionContext;
+  protected String applicationId;
+  protected SparkLauncher sparkLauncher;
+  protected SparkAppHandle sparkAppHandle;
+  protected SparkAppHandle.State jobState;
+
+  public String getApplicationId() {
+    return applicationId;
+  }
+
+  public ClusterDescriptorAdapter(ExecutionContext executionContext) {
+    this.executionContext = executionContext;
+  }
+
+  /** Returns the state of the spark job. */
+  public SparkAppHandle.State getJobState() {
+    return jobState;
+  }
+
+  /** Cancel the spark job. */
+  public void cancelJob() {
+    if (sparkAppHandle != null) {
+      logger.info("Start to cancel job {}.", sparkAppHandle.getAppId());
+      this.stopJob();
+    } else {
+      logger.warn("Cancel job: sparkAppHandle is null");
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "ClusterDescriptorAdapter{" + "applicationId=" + 
sparkAppHandle.getAppId() + '}';
+  }
+
+  @Override
+  public void close() {
+    if (sparkAppHandle != null) {
+      logger.info("Start to close job {}.", sparkAppHandle.getAppId());
+      this.stopJob();
+    } else {
+      logger.warn("Close job: sparkAppHandle is null");
+    }
+  }
+
+  private void stopJob() {
+    if (sparkAppHandle == null) {
+      return;
+    }
+    if (sparkAppHandle.getState().isFinal()) {
+      logger.info("Job has finished, stop action return.");
+      return;
+    }
+    try {
+      logger.info("Try to stop job {}.", sparkAppHandle.getAppId());
+      sparkAppHandle.stop();
+    } catch (Exception e) {
+      logger.error("Stop job failed.", e);
+      try {
+        logger.info("Try to kill job {}.", sparkAppHandle.getAppId());
+        sparkAppHandle.kill();
+      } catch (Exception ex) {
+        logger.error("Kill job failed.", ex);
+      }
+    }
+  }
+}
diff --git 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/launch/EngineConnLaunchBuilder.scala
 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapterFactory.java
similarity index 64%
copy from 
linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/launch/EngineConnLaunchBuilder.scala
copy to 
linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapterFactory.java
index c0e919863..8e9540dcb 100644
--- 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/launch/EngineConnLaunchBuilder.scala
+++ 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/ClusterDescriptorAdapterFactory.java
@@ -15,15 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.linkis.manager.engineplugin.common.launch
+package org.apache.linkis.engineplugin.spark.client.deployment;
 
-import org.apache.linkis.manager.engineplugin.common.launch.entity.{
-  EngineConnBuildRequest,
-  EngineConnLaunchRequest
-}
-
-trait EngineConnLaunchBuilder {
+import org.apache.linkis.engineplugin.spark.client.context.ExecutionContext;
 
-  def buildEngineConn(engineConnBuildRequest: EngineConnBuildRequest): 
EngineConnLaunchRequest
+public class ClusterDescriptorAdapterFactory {
 
+  public static ClusterDescriptorAdapter create(ExecutionContext 
executionContext) {
+    ClusterDescriptorAdapter clusterDescriptorAdapter =
+        new YarnApplicationClusterDescriptorAdapter(executionContext);
+    return clusterDescriptorAdapter;
+  }
 }
diff --git 
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/YarnApplicationClusterDescriptorAdapter.java
 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/YarnApplicationClusterDescriptorAdapter.java
new file mode 100644
index 000000000..1cdfb374e
--- /dev/null
+++ 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/YarnApplicationClusterDescriptorAdapter.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.client.deployment;
+
+import org.apache.linkis.engineplugin.spark.client.context.ExecutionContext;
+import org.apache.linkis.engineplugin.spark.client.context.SparkConfig;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.spark.launcher.SparkAppHandle;
+import org.apache.spark.launcher.SparkLauncher;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+public class YarnApplicationClusterDescriptorAdapter extends 
ClusterDescriptorAdapter {
+
+  public YarnApplicationClusterDescriptorAdapter(ExecutionContext 
executionContext) {
+    super(executionContext);
+  }
+
+  public void deployCluster(String mainClass, String args, Map<String, String> 
confMap)
+      throws IOException, InterruptedException {
+    SparkConfig sparkConfig = executionContext.getSparkConfig();
+
+    CountDownLatch countDownLatch = new CountDownLatch(1);
+    SparkAppHandle.Listener listener =
+        new SparkAppHandle.Listener() {
+          @Override
+          public void stateChanged(SparkAppHandle sparkAppHandle) {
+            jobState = sparkAppHandle.getState();
+            if (sparkAppHandle.getAppId() != null) {
+              countDownLatch.countDown();
+              applicationId = sparkAppHandle.getAppId();
+              logger.info("{} stateChanged: {}", applicationId, 
jobState.toString());
+            } else {
+              if (jobState.isFinal()) {
+                countDownLatch.countDown();
+              }
+              logger.info("stateChanged: {}", jobState.toString());
+            }
+          }
+
+          @Override
+          public void infoChanged(SparkAppHandle sparkAppHandle) {
+            jobState = sparkAppHandle.getState();
+            if (sparkAppHandle.getAppId() != null) {
+              logger.info("{} infoChanged: {}", sparkAppHandle.getAppId(), 
jobState.toString());
+            } else {
+              logger.info("infoChanged: {}", jobState.toString());
+            }
+          }
+        };
+
+    sparkLauncher = new SparkLauncher();
+    // region set args
+    sparkLauncher
+        .setJavaHome(sparkConfig.getJavaHome())
+        .setSparkHome(sparkConfig.getSparkHome())
+        .setMaster(sparkConfig.getMaster())
+        .setDeployMode(sparkConfig.getDeployMode())
+        .setAppName(sparkConfig.getAppName())
+        // .setPropertiesFile("")
+        .setVerbose(true);
+    sparkLauncher.setConf("spark.app.name", sparkConfig.getAppName());
+    if (confMap != null) confMap.forEach((k, v) -> sparkLauncher.setConf(k, 
v));
+    addSparkArg(sparkLauncher, "--jars", sparkConfig.getJars());
+    addSparkArg(sparkLauncher, "--packages", sparkConfig.getPackages());
+    addSparkArg(sparkLauncher, "--exclude-packages", 
sparkConfig.getExcludePackages());
+    addSparkArg(sparkLauncher, "--repositories", 
sparkConfig.getRepositories());
+    addSparkArg(sparkLauncher, "--files", sparkConfig.getFiles());
+    addSparkArg(sparkLauncher, "--archives", sparkConfig.getArchives());
+    addSparkArg(sparkLauncher, "--driver-memory", 
sparkConfig.getDriverMemory());
+    addSparkArg(sparkLauncher, "--driver-java-options", 
sparkConfig.getDriverJavaOptions());
+    addSparkArg(sparkLauncher, "--driver-library-path", 
sparkConfig.getDriverLibraryPath());
+    addSparkArg(sparkLauncher, "--driver-class-path", 
sparkConfig.getDriverClassPath());
+    addSparkArg(sparkLauncher, "--executor-memory", 
sparkConfig.getExecutorMemory());
+    addSparkArg(sparkLauncher, "--proxy-user", sparkConfig.getProxyUser());
+    addSparkArg(sparkLauncher, "--driver-cores", 
sparkConfig.getDriverCores().toString());
+    addSparkArg(sparkLauncher, "--total-executor-cores", 
sparkConfig.getTotalExecutorCores());
+    addSparkArg(sparkLauncher, "--executor-cores", 
sparkConfig.getExecutorCores().toString());
+    addSparkArg(sparkLauncher, "--num-executors", 
sparkConfig.getNumExecutors().toString());
+    addSparkArg(sparkLauncher, "--principal", sparkConfig.getPrincipal());
+    addSparkArg(sparkLauncher, "--keytab", sparkConfig.getKeytab());
+    addSparkArg(sparkLauncher, "--queue", sparkConfig.getQueue());
+    sparkLauncher.setAppResource(sparkConfig.getAppResource());
+    sparkLauncher.setMainClass(mainClass);
+    Arrays.stream(args.split("\\s+"))
+        .filter(StringUtils::isNotBlank)
+        .forEach(arg -> sparkLauncher.addAppArgs(arg));
+    // sparkLauncher.addAppArgs(args);
+    // endregion
+    sparkAppHandle = sparkLauncher.startApplication(listener);
+    countDownLatch.await();
+  }
+
+  private void addSparkArg(SparkLauncher sparkLauncher, String key, String 
value) {
+    if (StringUtils.isNotBlank(key) && StringUtils.isNotBlank(value)) {
+      sparkLauncher.addSparkArg(key, value);
+    }
+  }
+
+  public boolean initJobId() {
+    return null != getApplicationId();
+  }
+}
diff --git 
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/errorcode/SparkErrorCodeSummary.java
 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/errorcode/SparkErrorCodeSummary.java
index 084b7bcae..96c5f4dd3 100644
--- 
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/errorcode/SparkErrorCodeSummary.java
+++ 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/errorcode/SparkErrorCodeSummary.java
@@ -55,6 +55,13 @@ public enum SparkErrorCodeSummary {
       "{0} requires that the data to be inserted have the same number of 
columns as the target table: target table has {1} column(s) but the inserted 
data has {2} column(s)"),
   DATA_CALC_FIELD_NOT_EXIST(43023, "{0} columns({1}) are not exist in source 
columns"),
   DATA_CALC_VARIABLE_NOT_EXIST(43024, "Please set [{0}] in variables"),
+
+  NOT_SUPPORT_ADAPTER(43031, "Not support Adapter for spark application."),
+
+  YARN_APPLICATION_START_FAILED(
+      43032, "The application start failed, since yarn applicationId is 
null."),
+
+  NOT_SUPPORT_METHOD(43040, "Not support method for requestExpectedResource."),
   ;
 
   /** (errorCode)错误码 */
diff --git 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/launch/EngineConnLaunchBuilder.scala
 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/exception/ExecutorInitException.java
similarity index 72%
copy from 
linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/launch/EngineConnLaunchBuilder.scala
copy to 
linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/exception/ExecutorInitException.java
index c0e919863..bc3180d54 100644
--- 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/launch/EngineConnLaunchBuilder.scala
+++ 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/exception/ExecutorInitException.java
@@ -15,15 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.linkis.manager.engineplugin.common.launch
+package org.apache.linkis.engineplugin.spark.exception;
 
-import org.apache.linkis.manager.engineplugin.common.launch.entity.{
-  EngineConnBuildRequest,
-  EngineConnLaunchRequest
-}
-
-trait EngineConnLaunchBuilder {
-
-  def buildEngineConn(engineConnBuildRequest: EngineConnBuildRequest): 
EngineConnLaunchRequest
+import org.apache.linkis.common.exception.ErrorException;
 
+public class ExecutorInitException extends ErrorException {
+  public ExecutorInitException(int errCode, String desc) {
+    super(errCode, desc);
+  }
 }
diff --git 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/launch/EngineConnLaunchBuilder.scala
 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/exception/JobExecutionException.java
similarity index 72%
copy from 
linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/launch/EngineConnLaunchBuilder.scala
copy to 
linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/exception/JobExecutionException.java
index c0e919863..dfc70a283 100644
--- 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/launch/EngineConnLaunchBuilder.scala
+++ 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/exception/JobExecutionException.java
@@ -15,15 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.linkis.manager.engineplugin.common.launch
+package org.apache.linkis.engineplugin.spark.exception;
 
-import org.apache.linkis.manager.engineplugin.common.launch.entity.{
-  EngineConnBuildRequest,
-  EngineConnLaunchRequest
-}
-
-trait EngineConnLaunchBuilder {
-
-  def buildEngineConn(engineConnBuildRequest: EngineConnBuildRequest): 
EngineConnLaunchRequest
+import org.apache.linkis.common.exception.ErrorException;
 
+public class JobExecutionException extends ErrorException {
+  public JobExecutionException(int errCode, String desc) {
+    super(errCode, desc);
+  }
 }
diff --git 
a/linkis-engineconn-plugins/spark/src/main/resources/linkis-engineconn.properties
 
b/linkis-engineconn-plugins/spark/src/main/resources/linkis-engineconn.properties
index 06052244f..3de8a6512 100644
--- 
a/linkis-engineconn-plugins/spark/src/main/resources/linkis-engineconn.properties
+++ 
b/linkis-engineconn-plugins/spark/src/main/resources/linkis-engineconn.properties
@@ -26,4 +26,6 @@ 
wds.linkis.engineconn.plugin.default.class=org.apache.linkis.engineplugin.spark.
 
 
wds.linkis.engine.connector.hooks=org.apache.linkis.engineconn.computation.executor.hook.ComputationEngineConnHook,org.apache.linkis.engineconn.computation.executor.hook.PyUdfEngineHook,org.apache.linkis.engineconn.computation.executor.hook.ScalaUdfEngineHook,org.apache.linkis.engineconn.computation.executor.hook.JarUdfEngineHook,org.apache.linkis.engineconn.computation.executor.hook.SparkInitSQLHook
 
+linkis.spark.once.yarn.restful.url=http://127.0.0.1:8088
+
 wds.linkis.fs.hdfs.impl.disable.cache=true
\ No newline at end of file
diff --git 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/SparkEngineConnPlugin.scala
 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/SparkEngineConnPlugin.scala
index 67cfb49ad..0ef8ba81b 100644
--- 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/SparkEngineConnPlugin.scala
+++ 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/SparkEngineConnPlugin.scala
@@ -21,13 +21,13 @@ import org.apache.linkis.engineplugin.spark.factory.{
   SparkEngineConnFactory,
   SparkEngineConnResourceFactory
 }
-import 
org.apache.linkis.engineplugin.spark.launch.SparkSubmitProcessEngineConnLaunchBuilder
+import org.apache.linkis.engineplugin.spark.launch.SparkEngineConnLaunchBuilder
 import org.apache.linkis.manager.engineplugin.common.EngineConnPlugin
 import org.apache.linkis.manager.engineplugin.common.creation.EngineConnFactory
 import 
org.apache.linkis.manager.engineplugin.common.launch.EngineConnLaunchBuilder
 import 
org.apache.linkis.manager.engineplugin.common.resource.EngineResourceFactory
 import org.apache.linkis.manager.label.entity.Label
-import org.apache.linkis.manager.label.entity.engine.{EngineType, 
EngineTypeLabel}
+import org.apache.linkis.manager.label.entity.engine.EngineType
 import org.apache.linkis.manager.label.utils.EngineTypeLabelCreator
 
 import java.util
@@ -61,7 +61,7 @@ class SparkEngineConnPlugin extends EngineConnPlugin {
   }
 
   override def getEngineConnLaunchBuilder: EngineConnLaunchBuilder = {
-    SparkSubmitProcessEngineConnLaunchBuilder.newBuilder()
+    new SparkEngineConnLaunchBuilder()
   }
 
   override def getEngineConnFactory: EngineConnFactory = {
diff --git 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala
 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala
index 20cc9006a..c4370c7eb 100644
--- 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala
+++ 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala
@@ -27,6 +27,9 @@ import java.io.File
  */
 object SparkConfiguration extends Logging {
 
+  val SPARK_HOME_ENV = "SPARK_HOME"
+  val SPARK_CONF_DIR_ENV = "SPARK_CONF_DIR"
+
   val PROCESS_MAX_THREADS = 
CommonVars[Int]("wds.linkis.process.threadpool.max", 100)
 
   val SPARK_SESSION_HOOK = 
CommonVars[String]("wds.linkis.engine.spark.session.hook", "")
@@ -39,11 +42,13 @@ object SparkConfiguration extends Logging {
 
   val SPARK_REPL_CLASSDIR = CommonVars[String]("spark.repl.classdir", "", 
"默认master")
 
-  val PROXY_USER = CommonVars[String]("spark.proxy.user", "${UM}")
+  val PROXY_USER = CommonVars[String]("spark.proxy.user", "")
 
   val SPARK_DEPLOY_MODE = CommonVars[String]("spark.submit.deployMode", 
"client")
 
   val SPARK_APP_NAME = CommonVars[String]("spark.app.name", 
"Linkis-EngineConn-Spark")
+  val SPARK_APP_RESOURCE = CommonVars[String]("spark.app.resource", "")
+  val SPARK_APP_CONF = CommonVars[String]("spark.extconf", "")
 
   val SPARK_PYTHON_VERSION = CommonVars[String]("spark.python.version", 
"python")
 
@@ -121,6 +126,17 @@ object SparkConfiguration extends Logging {
 
   val REPLACE_PACKAGE_TO_HEADER = "org.apache.linkis"
 
+  val SPARK_APPLICATION_ARGS = CommonVars("spark.app.args", "")
+  val SPARK_APPLICATION_MAIN_CLASS = CommonVars("spark.app.main.class", "")
+
+  val SPARK_ONCE_APP_STATUS_FETCH_INTERVAL =
+    CommonVars("linkis.spark.once.app.fetch.status.interval", new 
TimeType("5s"))
+
+  val SPARK_ONCE_APP_STATUS_FETCH_FAILED_MAX =
+    CommonVars("linkis.spark.once.app.fetch.status.failed.num", 3)
+
+  val SPARK_ONCE_YARN_RESTFUL_URL = 
CommonVars[String]("linkis.spark.once.yarn.restful.url", "")
+
   private def getMainJarName(): String = {
     val somePath = ClassUtils.jarOfClass(classOf[SparkEngineConnFactory])
     if (somePath.isDefined) {
diff --git 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/context/EnvironmentContext.scala
 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/context/EnvironmentContext.scala
new file mode 100644
index 000000000..153a72632
--- /dev/null
+++ 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/context/EnvironmentContext.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.context
+
+import org.apache.linkis.engineplugin.spark.client.context.SparkConfig
+
+import java.net.URL
+import java.util
+import java.util.Objects
+
+class EnvironmentContext(
+    yarnConfDir: String,
+    sparkConfDir: String,
+    sparkHome: String,
+    dependencies: util.List[URL]
+) {
+
+  private var sparkConfig: SparkConfig = _
+
+  def this(
+      sparkConfig: SparkConfig,
+      yarnConfDir: String,
+      sparkConfDir: String,
+      sparkHome: String,
+      dependencies: util.List[URL]
+  ) {
+    this(yarnConfDir, sparkConfDir, sparkHome, dependencies)
+    this.sparkConfig = sparkConfig
+  }
+
+  def getYarnConfDir: String = yarnConfDir
+
+  def getSparkConfDir: String = sparkConfDir
+
+  def getSparkHome: String = sparkHome
+
+  def getSparkConfig: SparkConfig = sparkConfig
+
+  def getDependencies: util.List[URL] = dependencies
+
+  override def equals(o: Any): Boolean = o match {
+    case context: EnvironmentContext =>
+      if (this eq context) return true
+      Objects.equals(dependencies, context.getDependencies) &&
+      Objects.equals(sparkConfig, context.sparkConfig)
+    case _ => false
+  }
+
+  override def hashCode: Int = Objects.hash(dependencies, sparkConfig)
+}
diff --git 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/launch/EngineConnLaunchBuilder.scala
 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/context/SparkEngineConnContext.scala
similarity index 61%
copy from 
linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/launch/EngineConnLaunchBuilder.scala
copy to 
linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/context/SparkEngineConnContext.scala
index c0e919863..4b1e6a2eb 100644
--- 
a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/launch/EngineConnLaunchBuilder.scala
+++ 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/context/SparkEngineConnContext.scala
@@ -15,15 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.linkis.manager.engineplugin.common.launch
+package org.apache.linkis.engineplugin.spark.context
 
-import org.apache.linkis.manager.engineplugin.common.launch.entity.{
-  EngineConnBuildRequest,
-  EngineConnLaunchRequest
-}
+import org.apache.linkis.engineplugin.spark.client.context.ExecutionContext
+
+class SparkEngineConnContext(environmentContext: EnvironmentContext) {
+
+  private var executionContext: ExecutionContext = _
+
+  def getEnvironmentContext: EnvironmentContext = environmentContext
 
-trait EngineConnLaunchBuilder {
+  def getExecutionContext: ExecutionContext = executionContext
 
-  def buildEngineConn(engineConnBuildRequest: EngineConnBuildRequest): 
EngineConnLaunchRequest
+  def setExecutionContext(executionContext: ExecutionContext): Unit = 
this.executionContext =
+    executionContext
 
 }
diff --git 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkExecutor.scala
 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkExecutor.scala
new file mode 100644
index 000000000..dad6ef2ed
--- /dev/null
+++ 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkExecutor.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.executor
+
+import org.apache.linkis.engineconn.executor.entity.{LabelExecutor, 
ResourceExecutor, YarnExecutor}
+import org.apache.linkis.engineplugin.spark.context.SparkEngineConnContext
+import org.apache.linkis.engineplugin.spark.errorcode.SparkErrorCodeSummary
+import org.apache.linkis.engineplugin.spark.exception.JobExecutionException
+import org.apache.linkis.manager.common.entity.resource._
+import org.apache.linkis.manager.label.entity.Label
+
+import java.util
+
+trait SparkExecutor extends YarnExecutor with LabelExecutor with 
ResourceExecutor {
+
+  private var applicationId: String = _
+  private var yarnMode: String = "Client"
+  private var queue: String = _
+
+  private var executorLabels: util.List[Label[_]] = new 
util.ArrayList[Label[_]]
+
+  override def getApplicationId: String = applicationId
+
+  def setApplicationId(applicationId: String): Unit = this.applicationId = 
applicationId
+
+  override def getYarnMode: String = yarnMode
+
+  def setYarnMode(yarnMode: String): Unit = this.yarnMode = yarnMode
+
+  override def getQueue: String = queue
+
+  def setQueue(queue: String): Unit = this.queue = queue
+
+  override def getExecutorLabels(): util.List[Label[_]] = executorLabels
+
+  override def setExecutorLabels(labels: util.List[Label[_]]): Unit = 
this.executorLabels = labels
+
+  override def requestExpectedResource(expectedResource: NodeResource): 
NodeResource =
+    throw new JobExecutionException(
+      SparkErrorCodeSummary.NOT_SUPPORT_METHOD.getErrorCode,
+      SparkErrorCodeSummary.NOT_SUPPORT_METHOD.getErrorDesc
+    )
+
+  protected val sparkEngineConnContext: SparkEngineConnContext
+
+  queue = sparkEngineConnContext.getEnvironmentContext.getSparkConfig.getQueue
+
+  def supportCallBackLogs(): Boolean = true
+}
diff --git 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkOnceExecutor.scala
 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkOnceExecutor.scala
new file mode 100644
index 000000000..faa90b03a
--- /dev/null
+++ 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkOnceExecutor.scala
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.executor
+
+import org.apache.linkis.common.utils.Utils
+import org.apache.linkis.engineconn.once.executor.{
+  ManageableOnceExecutor,
+  OnceExecutorExecutionContext
+}
+import org.apache.linkis.engineplugin.spark.client.deployment.{
+  ClusterDescriptorAdapter,
+  ClusterDescriptorAdapterFactory
+}
+import org.apache.linkis.engineplugin.spark.config.SparkConfiguration.{
+  SPARK_ONCE_APP_STATUS_FETCH_FAILED_MAX,
+  SPARK_ONCE_APP_STATUS_FETCH_INTERVAL
+}
+import org.apache.linkis.engineplugin.spark.errorcode.SparkErrorCodeSummary
+import org.apache.linkis.engineplugin.spark.exception.ExecutorInitException
+import org.apache.linkis.manager.common.entity.enumeration.NodeStatus
+
+import org.apache.spark.launcher.SparkAppHandle
+
+import java.util.concurrent.{Future, TimeUnit}
+
+import scala.collection.convert.WrapAsScala._
+
+trait SparkOnceExecutor[T <: ClusterDescriptorAdapter]
+    extends ManageableOnceExecutor
+    with SparkExecutor {
+
+  protected var clusterDescriptorAdapter: T = _
+  private var daemonThread: Future[_] = _
+
+  protected def submit(onceExecutorExecutionContext: 
OnceExecutorExecutionContext): Unit = {
+    
ClusterDescriptorAdapterFactory.create(sparkEngineConnContext.getExecutionContext)
 match {
+      case adapter: T => clusterDescriptorAdapter = adapter
+      case _ =>
+        throw new ExecutorInitException(
+          SparkErrorCodeSummary.NOT_SUPPORT_ADAPTER.getErrorCode,
+          SparkErrorCodeSummary.NOT_SUPPORT_ADAPTER.getErrorDesc
+        )
+    }
+    val options = 
onceExecutorExecutionContext.getOnceExecutorContent.getJobContent.map {
+      case (k, v: String) => k -> v
+      case (k, v) if v != null => k -> v.toString
+      case (k, _) => k -> null
+    }.toMap
+    doSubmit(onceExecutorExecutionContext, options)
+    if (isCompleted) return
+    if (null == clusterDescriptorAdapter.getApplicationId)
+      throw new ExecutorInitException(
+        SparkErrorCodeSummary.YARN_APPLICATION_START_FAILED.getErrorCode,
+        SparkErrorCodeSummary.YARN_APPLICATION_START_FAILED.getErrorDesc
+      )
+    setApplicationId(clusterDescriptorAdapter.getApplicationId)
+    logger.info(s"Application is started, applicationId: $getApplicationId.")
+  }
+
+  protected def isCompleted: Boolean = isClosed || 
NodeStatus.isCompleted(getStatus)
+
+  def doSubmit(
+      onceExecutorExecutionContext: OnceExecutorExecutionContext,
+      options: Map[String, String]
+  ): Unit
+
+  val id: Long
+
+  override def getId: String = "SparkOnceApp_" + id
+
+  protected def closeDaemon(): Unit = {
+    if (daemonThread != null) daemonThread.cancel(true)
+  }
+
+  override def close(): Unit = {
+    super.close()
+    closeDaemon()
+    if (clusterDescriptorAdapter != null) {
+      clusterDescriptorAdapter.close()
+    }
+  }
+
+  override protected def waitToRunning(): Unit = {
+    var waitingToFinished = false
+    if (!isCompleted)
+      daemonThread = Utils.defaultScheduler.scheduleAtFixedRate(
+        new Runnable {
+          private var lastStatus: SparkAppHandle.State = _
+          private var lastPrintTime = 0L
+          private val printInterval =
+            math.max(SPARK_ONCE_APP_STATUS_FETCH_INTERVAL.getValue.toLong, 5 * 
60 * 1000)
+          private var fetchJobStatusFailedNum = 0
+
+          override def run(): Unit = if (!isCompleted && !waitingToFinished) {
+            val jobState = 
Utils.tryCatch(clusterDescriptorAdapter.getJobState) { t =>
+              val maxFailedNum = 
SPARK_ONCE_APP_STATUS_FETCH_FAILED_MAX.getValue
+              if (fetchJobStatusFailedNum >= maxFailedNum) {
+                val errMsg =
+                  s"Fetch job status has failed max $maxFailedNum times, now 
stop this SparkEngineConn."
+                logger.error(errMsg, t)
+                tryFailed()
+                close()
+              } else {
+                fetchJobStatusFailedNum += 1
+                logger.error(s"Fetch job status failed! retried 
++$fetchJobStatusFailedNum...", t)
+              }
+              return
+            }
+
+            fetchJobStatusFailedNum = 0
+            if (
+                jobState != lastStatus || System.currentTimeMillis - 
lastPrintTime >= printInterval
+            ) {
+              logger.info(s"The jobState of $getApplicationId is $jobState.")
+              lastPrintTime = System.currentTimeMillis
+            }
+            lastStatus = jobState
+            if (SparkAppHandle.State.FINISHED == lastStatus) {
+              waitingToFinished = true
+              logger.info("Job has finished, waiting for final status.")
+              Thread.sleep(5000)
+              logger.info(s"Job's final status 
${clusterDescriptorAdapter.getJobState}.")
+            }
+            clusterDescriptorAdapter.getJobState match {
+              case SparkAppHandle.State.FAILED | SparkAppHandle.State.KILLED |
+                  SparkAppHandle.State.LOST =>
+                tryFailed()
+              case SparkAppHandle.State.FINISHED =>
+                trySucceed()
+              case _ =>
+            }
+            waitingToFinished = false
+          }
+        },
+        SPARK_ONCE_APP_STATUS_FETCH_INTERVAL.getValue.toLong,
+        SPARK_ONCE_APP_STATUS_FETCH_INTERVAL.getValue.toLong,
+        TimeUnit.MILLISECONDS
+      )
+  }
+
+  override def supportCallBackLogs(): Boolean = true
+
+}
diff --git 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkSubmitOnceExecutor.scala
 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkSubmitOnceExecutor.scala
new file mode 100644
index 000000000..8cec44d00
--- /dev/null
+++ 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkSubmitOnceExecutor.scala
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.executor
+
+import org.apache.linkis.common.utils.{ByteTimeUtils, Utils}
+import org.apache.linkis.engineconn.once.executor.{
+  OnceExecutorExecutionContext,
+  OperableOnceExecutor
+}
+import 
org.apache.linkis.engineplugin.spark.client.deployment.YarnApplicationClusterDescriptorAdapter
+import org.apache.linkis.engineplugin.spark.config.SparkConfiguration.{
+  SPARK_APP_CONF,
+  SPARK_APPLICATION_ARGS,
+  SPARK_APPLICATION_MAIN_CLASS
+}
+import org.apache.linkis.engineplugin.spark.context.SparkEngineConnContext
+import org.apache.linkis.engineplugin.spark.utils.SparkJobProgressUtil
+import org.apache.linkis.manager.common.entity.resource._
+import org.apache.linkis.manager.common.utils.ResourceUtils
+import org.apache.linkis.protocol.engine.JobProgressInfo
+
+import org.apache.commons.lang3.StringUtils
+
+import java.util
+
+import scala.concurrent.duration.Duration
+
+class SparkSubmitOnceExecutor(
+    override val id: Long,
+    override protected val sparkEngineConnContext: SparkEngineConnContext
+) extends SparkOnceExecutor[YarnApplicationClusterDescriptorAdapter]
+    with OperableOnceExecutor {
+
+  private var oldProgress: Float = 0f
+
+  override def doSubmit(
+      onceExecutorExecutionContext: OnceExecutorExecutionContext,
+      options: Map[String, String]
+  ): Unit = {
+    val args = SPARK_APPLICATION_ARGS.getValue(options)
+    val mainClass = SPARK_APPLICATION_MAIN_CLASS.getValue(options)
+    val extConf = SPARK_APP_CONF.getValue(options)
+    val confMap = new util.HashMap[String, String]()
+    if (StringUtils.isNotBlank(extConf)) {
+      for (conf <- extConf.split("\n")) {
+        if (StringUtils.isNotBlank(conf)) {
+          val pair = conf.trim.split("=")
+          if (pair.length == 2) {
+            confMap.put(pair(0), pair(1))
+          } else {
+            logger.warn(s"ignore spark conf: $conf")
+          }
+        }
+      }
+    }
+    logger.info(s"${System.getenv("YARN_RESTFUL_URL")}")
+    logger.info(s"Ready to submit spark application, mainClass: $mainClass, 
args: $args.")
+    clusterDescriptorAdapter.deployCluster(mainClass, args, confMap)
+  }
+
+  override protected def waitToRunning(): Unit = {
+    Utils.waitUntil(() => clusterDescriptorAdapter.initJobId(), Duration.Inf)
+    super.waitToRunning()
+  }
+
+  override def getApplicationURL: String = ""
+
+  override def getCurrentNodeResource(): NodeResource = {
+    logger.info("Begin to get actual used resources!")
+    Utils.tryCatch({
+      val sparkConf = sparkEngineConnContext.getExecutionContext.getSparkConfig
+      val executorNum: Int = sparkConf.getNumExecutors
+      val executorMem: Long =
+        ByteTimeUtils.byteStringAsBytes(sparkConf.getExecutorMemory) * 
executorNum
+
+      val driverMem: Long = 
ByteTimeUtils.byteStringAsBytes(sparkConf.getDriverMemory)
+      val sparkExecutorCores = sparkConf.getExecutorCores * executorNum
+      val sparkDriverCores = sparkConf.getDriverCores
+      val queue = sparkConf.getQueue
+      logger.info(
+        "Current actual used resources is driverMem:" + driverMem + 
",driverCores:" + sparkDriverCores + ",executorMem:" + executorMem + 
",executorCores:" + sparkExecutorCores + ",queue:" + queue
+      )
+      val usedResource = new DriverAndYarnResource(
+        new LoadInstanceResource(driverMem, sparkDriverCores, 1),
+        new YarnResource(executorMem, sparkExecutorCores, 0, queue)
+      )
+      val nodeResource = new CommonNodeResource
+      nodeResource.setUsedResource(usedResource)
+      
nodeResource.setResourceType(ResourceUtils.getResourceTypeByResource(usedResource))
+      nodeResource
+    })(t => {
+      logger.warn("Get actual used resource exception", t)
+      null
+    })
+  }
+
+  override def getProgress: Float = {
+    if (clusterDescriptorAdapter == null) {
+      logger.info("clusterDescriptorAdapter is null")
+    } else if (clusterDescriptorAdapter.getJobState == null) {
+      logger.info("clusterDescriptorAdapter.getJobState is null")
+    } else {
+      logger.info("clusterDescriptorAdapter/getJobState is not null")
+    }
+    val jobIsFinal = clusterDescriptorAdapter != null &&
+      clusterDescriptorAdapter.getJobState != null &&
+      clusterDescriptorAdapter.getJobState.isFinal
+    if (oldProgress >= 1 || jobIsFinal) {
+      1
+    } else {
+      val newProgress = SparkJobProgressUtil.getProgress(this.getApplicationId)
+      if (newProgress > oldProgress) {
+        oldProgress = newProgress
+      }
+      oldProgress
+    }
+  }
+
+  override def getProgressInfo: Array[JobProgressInfo] =
+    SparkJobProgressUtil.getSparkJobProgressInfo(this.getApplicationId)
+
+  override def getMetrics: util.Map[String, Any] = {
+    new util.HashMap[String, Any]()
+  }
+
+  override def getDiagnosis: util.Map[String, Any] = {
+    new util.HashMap[String, Any]()
+  }
+
+}
diff --git 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
index d45f8b6ac..ec3b697ea 100644
--- 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
+++ 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
@@ -18,19 +18,27 @@
 package org.apache.linkis.engineplugin.spark.factory
 
 import org.apache.linkis.common.conf.CommonVars
-import org.apache.linkis.common.utils.{Logging, Utils}
+import org.apache.linkis.common.utils.{JsonUtils, Logging, Utils}
 import org.apache.linkis.engineconn.common.creation.EngineCreationContext
+import org.apache.linkis.engineconn.launch.EngineConnServer
+import org.apache.linkis.engineplugin.spark.client.context.{ExecutionContext, 
SparkConfig}
 import org.apache.linkis.engineplugin.spark.config.SparkConfiguration
+import org.apache.linkis.engineplugin.spark.config.SparkConfiguration._
+import org.apache.linkis.engineplugin.spark.config.SparkResourceConfiguration._
+import org.apache.linkis.engineplugin.spark.context.{EnvironmentContext, 
SparkEngineConnContext}
 import org.apache.linkis.engineplugin.spark.entity.SparkEngineSession
 import org.apache.linkis.engineplugin.spark.errorcode.SparkErrorCodeSummary._
 import org.apache.linkis.engineplugin.spark.exception.{
   SparkCreateFileException,
   SparkSessionNullException
 }
+import org.apache.linkis.manager.engineplugin.common.conf.EnvConfiguration
 import org.apache.linkis.manager.engineplugin.common.creation.{
   ExecutorFactory,
   MultiExecutorEngineConnFactory
 }
+import org.apache.linkis.manager.engineplugin.common.launch.process.Environment
+import 
org.apache.linkis.manager.engineplugin.common.launch.process.Environment.variable
 import org.apache.linkis.manager.label.entity.engine.EngineType
 import org.apache.linkis.manager.label.entity.engine.EngineType.EngineType
 import org.apache.linkis.server.JMap
@@ -51,8 +59,72 @@ class SparkEngineConnFactory extends 
MultiExecutorEngineConnFactory with Logging
   override protected def createEngineConnSession(
       engineCreationContext: EngineCreationContext
   ): Any = {
+    if (EngineConnServer.isOnceMode) {
+      createSparkOnceEngineConnContext(engineCreationContext)
+    } else {
+      createSparkEngineSession(engineCreationContext)
+    }
+  }
+
+  def createSparkOnceEngineConnContext(
+      engineCreationContext: EngineCreationContext
+  ): SparkEngineConnContext = {
+    val environmentContext = createEnvironmentContext(engineCreationContext)
+    val sparkEngineConnContext = new SparkEngineConnContext(environmentContext)
+    val executionContext =
+      createExecutionContext(engineCreationContext.getOptions, 
environmentContext)
+    sparkEngineConnContext.setExecutionContext(executionContext)
+    sparkEngineConnContext
+  }
+
+  protected def createEnvironmentContext(
+      engineCreationContext: EngineCreationContext
+  ): EnvironmentContext = {
+    val options = engineCreationContext.getOptions
+    val hadoopConfDir = EnvConfiguration.HADOOP_CONF_DIR.getValue(options)
+    val sparkHome = SPARK_HOME.getValue(options)
+    val sparkConfDir = SPARK_CONF_DIR.getValue(options)
+    val sparkConfig: SparkConfig = getSparkConfig(options)
+    val context = new EnvironmentContext(sparkConfig, hadoopConfDir, 
sparkConfDir, sparkHome, null)
+    context
+  }
+
+  def getSparkConfig(options: util.Map[String, String]): SparkConfig = {
+    logger.info("options: " + JsonUtils.jackson.writeValueAsString(options))
+    val sparkConfig: SparkConfig = new SparkConfig()
+    sparkConfig.setJavaHome(variable(Environment.JAVA_HOME))
+    sparkConfig.setSparkHome(SPARK_HOME.getValue(options))
+    sparkConfig.setMaster(SPARK_MASTER.getValue(options))
+    sparkConfig.setDeployMode(SPARK_DEPLOY_MODE.getValue(options))
+    sparkConfig.setAppResource(SPARK_APP_RESOURCE.getValue(options))
+    sparkConfig.setAppName(SPARK_APP_NAME.getValue(options))
+    sparkConfig.setJars(SPARK_EXTRA_JARS.getValue(options)) // todo
+    sparkConfig.setDriverMemory(LINKIS_SPARK_DRIVER_MEMORY.getValue(options))
+    
sparkConfig.setDriverJavaOptions(SPARK_DRIVER_EXTRA_JAVA_OPTIONS.getValue(options))
+    sparkConfig.setDriverClassPath(SPARK_DRIVER_CLASSPATH.getValue(options))
+    
sparkConfig.setExecutorMemory(LINKIS_SPARK_EXECUTOR_MEMORY.getValue(options))
+    sparkConfig.setProxyUser(PROXY_USER.getValue(options))
+    sparkConfig.setDriverCores(LINKIS_SPARK_DRIVER_CORES.getValue(options))
+    sparkConfig.setExecutorCores(LINKIS_SPARK_EXECUTOR_CORES.getValue(options))
+    
sparkConfig.setNumExecutors(LINKIS_SPARK_EXECUTOR_INSTANCES.getValue(options))
+    sparkConfig.setQueue(LINKIS_QUEUE_NAME.getValue(options))
+
+    logger.info(s"spark_info: ${sparkConfig}")
+    sparkConfig
+  }
+
+  def createExecutionContext(
+      options: util.Map[String, String],
+      environmentContext: EnvironmentContext
+  ): ExecutionContext = {
+    val context = new ExecutionContext()
+    context.setSparkConfig(environmentContext.getSparkConfig)
+    // todo
+    context
+  }
+
+  def createSparkEngineSession(engineCreationContext: EngineCreationContext): 
SparkEngineSession = {
     val options = engineCreationContext.getOptions
-    val useSparkSubmit = true
     val sparkConf: SparkConf = new SparkConf(true)
     val master =
       sparkConf.getOption("spark.master").getOrElse(CommonVars("spark.master", 
"yarn").getValue)
@@ -188,7 +260,8 @@ class SparkEngineConnFactory extends 
MultiExecutorEngineConnFactory with Logging
     new SparkSqlExecutorFactory,
     new SparkPythonExecutorFactory,
     new SparkScalaExecutorFactory,
-    new SparkDataCalcExecutorFactory
+    new SparkDataCalcExecutorFactory,
+    new SparkOnceExecutorFactory
   )
 
   override def getExecutorFactories: Array[ExecutorFactory] = {
diff --git 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkOnceExecutorFactory.scala
 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkOnceExecutorFactory.scala
new file mode 100644
index 000000000..25e264944
--- /dev/null
+++ 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkOnceExecutorFactory.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.factory
+
+import org.apache.linkis.engineconn.common.creation.EngineCreationContext
+import org.apache.linkis.engineconn.common.engineconn.EngineConn
+import org.apache.linkis.engineconn.once.executor.OnceExecutor
+import org.apache.linkis.engineconn.once.executor.creation.OnceExecutorFactory
+import org.apache.linkis.engineplugin.spark.context.SparkEngineConnContext
+import org.apache.linkis.engineplugin.spark.executor.SparkSubmitOnceExecutor
+import org.apache.linkis.manager.label.entity.Label
+import org.apache.linkis.manager.label.entity.engine.RunType
+import org.apache.linkis.manager.label.entity.engine.RunType.RunType
+
+class SparkOnceExecutorFactory extends OnceExecutorFactory {
+
+  override protected def newExecutor(
+      id: Int,
+      engineCreationContext: EngineCreationContext,
+      engineConn: EngineConn,
+      labels: Array[Label[_]]
+  ): OnceExecutor =
+    engineConn.getEngineConnSession match {
+      case context: SparkEngineConnContext =>
+        new SparkSubmitOnceExecutor(id, context)
+    }
+
+  override protected def getRunType: RunType = RunType.JAR
+}
diff --git 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkEngineConnLaunchBuilder.scala
 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkEngineConnLaunchBuilder.scala
new file mode 100644
index 000000000..8be047e2f
--- /dev/null
+++ 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkEngineConnLaunchBuilder.scala
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.launch
+
+import org.apache.linkis.common.utils.JsonUtils
+import org.apache.linkis.engineplugin.spark.config.SparkConfiguration.{
+  SPARK_CONF_DIR_ENV,
+  SPARK_HOME_ENV
+}
+import org.apache.linkis.engineplugin.spark.config.SparkResourceConfiguration
+import org.apache.linkis.hadoop.common.conf.HadoopConf
+import org.apache.linkis.manager.common.protocol.bml.BmlResource
+import org.apache.linkis.manager.engineplugin.common.conf.EnvConfiguration
+import 
org.apache.linkis.manager.engineplugin.common.launch.entity.EngineConnBuildRequest
+import 
org.apache.linkis.manager.engineplugin.common.launch.process.Environment.{variable,
 USER}
+import 
org.apache.linkis.manager.engineplugin.common.launch.process.JavaProcessEngineConnLaunchBuilder
+import 
org.apache.linkis.manager.engineplugin.common.launch.process.LaunchConstants.addPathToClassPath
+import org.apache.linkis.manager.label.entity.engine.{
+  EngineConnMode,
+  EngineConnModeLabel,
+  UserCreatorLabel
+}
+import org.apache.linkis.manager.label.utils.LabelUtil
+
+import java.util
+
+import scala.collection.JavaConverters.asScalaBufferConverter
+
+import com.google.common.collect.Lists
+
+class SparkEngineConnLaunchBuilder extends JavaProcessEngineConnLaunchBuilder {
+
+  override protected def getCommands(implicit
+      engineConnBuildRequest: EngineConnBuildRequest
+  ): Array[String] = {
+    if (isOnceMode) {
+      val properties = engineConnBuildRequest.engineConnCreationDesc.properties
+      properties.put(
+        EnvConfiguration.ENGINE_CONN_MEMORY.key,
+        
SparkResourceConfiguration.LINKIS_SPARK_DRIVER_MEMORY.getValue(properties)
+      )
+      super.getCommands
+    } else {
+      new SparkSubmitProcessEngineConnLaunchBuilder(this).getCommands(
+        engineConnBuildRequest,
+        getMainClass,
+        getGcLogDir(engineConnBuildRequest),
+        getLogDir(engineConnBuildRequest)
+      )
+    }
+  }
+
+  def isOnceMode: Boolean = {
+    val engineConnMode = 
LabelUtil.getEngineConnMode(engineConnBuildRequest.labels)
+    EngineConnMode.toEngineConnMode(engineConnMode) == EngineConnMode.Once
+  }
+
+  override def getEnvironment(implicit
+      engineConnBuildRequest: EngineConnBuildRequest
+  ): util.Map[String, String] = {
+    val environment = super.getEnvironment
+    if (isOnceMode)
+      addPathToClassPath(environment, s"$$$SPARK_HOME_ENV/jars/*")
+    environment
+  }
+
+  override protected def getBmlResources(implicit
+      engineConnBuildRequest: EngineConnBuildRequest
+  ): java.util.List[BmlResource] = {
+    if (!isOnceMode) return super.getBmlResources
+    val bmlResources = new 
java.util.ArrayList[BmlResource](super.getBmlResources)
+    val properties = engineConnBuildRequest.engineConnCreationDesc.properties
+    val userName = engineConnBuildRequest.labels.asScala
+      .find(_.isInstanceOf[UserCreatorLabel])
+      .map { case label: UserCreatorLabel => label.getUser }
+      .get
+
+    def getBmlString(bml: BmlResource): String = {
+      s"BmlResource(${bml.getFileName}, ${bml.getResourceId}, 
${bml.getVersion})"
+    }
+
+    val ticketId = engineConnBuildRequest.ticketId
+    properties.get("spark.app.main.class.jar.bml.json") match {
+      case mainClassJarContent: String =>
+        val bml = contentToBmlResource(userName, mainClassJarContent)
+        logger.info(s"Add a ${getBmlString(bml)} for user $userName and 
ticketId $ticketId")
+        bmlResources.add(bml)
+        properties.remove("spark.app.main.class.jar.bml.json")
+      case _ =>
+    }
+    properties.get("spark.app.user.class.path.bml.json") match {
+      case classpathContent: String =>
+        val contentList = JsonUtils.jackson.readValue(
+          classpathContent,
+          classOf[java.util.List[java.util.Map[String, Object]]]
+        )
+        contentList.asScala.map(contentToBmlResource(userName, _)).foreach { 
bml =>
+          logger.info(s"Add a ${getBmlString(bml)} for user $userName and 
ticketId $ticketId")
+          bmlResources.add(bml)
+        }
+        properties.remove("spark.app.user.class.path.bml.json")
+      case _ =>
+    }
+    bmlResources
+  }
+
+  private def contentToBmlResource(userName: String, content: String): 
BmlResource = {
+    val contentMap = JsonUtils.jackson.readValue(content, 
classOf[java.util.Map[String, Object]])
+    contentToBmlResource(userName, contentMap)
+  }
+
+  private def contentToBmlResource(
+      userName: String,
+      contentMap: java.util.Map[String, Object]
+  ): BmlResource = {
+    val bmlResource = new BmlResource
+    bmlResource.setFileName(contentMap.get("fileName").asInstanceOf[String])
+    
bmlResource.setResourceId(contentMap.get("resourceId").asInstanceOf[String])
+    bmlResource.setVersion(contentMap.get("version").asInstanceOf[String])
+    bmlResource.setOwner(userName)
+    bmlResource.setVisibility(BmlResource.BmlResourceVisibility.Private)
+    bmlResource
+  }
+
+  override protected def getNecessaryEnvironment(implicit
+      engineConnBuildRequest: EngineConnBuildRequest
+  ): Array[String] = if (isOnceMode) {
+    Array(SPARK_HOME_ENV, SPARK_CONF_DIR_ENV) ++: super.getNecessaryEnvironment
+  } else {
+    super.getNecessaryEnvironment
+  }
+
+  override protected def getExtractJavaOpts: String = if (isOnceMode) {
+    if (!HadoopConf.KEYTAB_PROXYUSER_ENABLED.getValue) super.getExtractJavaOpts
+    else super.getExtractJavaOpts + s" 
-DHADOOP_PROXY_USER=${variable(USER)}".trim
+  } else {
+    super.getExtractJavaOpts
+  }
+
+  override protected def ifAddHiveConfigPath: Boolean = if (isOnceMode) {
+    true
+  } else {
+    super.ifAddHiveConfigPath
+  }
+
+  override def enablePublicModule: Boolean = !isOnceMode
+
+  override protected def getEngineConnManagerHooks(implicit
+      engineConnBuildRequest: EngineConnBuildRequest
+  ): java.util.List[String] = if (isOnceMode) {
+    super.getEngineConnManagerHooks(engineConnBuildRequest)
+  } else {
+    Lists.newArrayList("JarUDFLoadECMHook")
+  }
+
+}
diff --git 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkSubmitProcessEngineConnLaunchBuilder.scala
 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkSubmitProcessEngineConnLaunchBuilder.scala
index f9b53f8e1..f25a1d55b 100644
--- 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkSubmitProcessEngineConnLaunchBuilder.scala
+++ 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkSubmitProcessEngineConnLaunchBuilder.scala
@@ -18,347 +18,152 @@
 package org.apache.linkis.engineplugin.spark.launch
 
 import org.apache.linkis.common.conf.CommonVars
-import org.apache.linkis.engineplugin.spark.config.{SparkConfiguration, 
SparkResourceConfiguration}
-import 
org.apache.linkis.engineplugin.spark.config.SparkResourceConfiguration.LINKIS_SPARK_DRIVER_MEMORY
-import 
org.apache.linkis.engineplugin.spark.launch.SparkSubmitProcessEngineConnLaunchBuilder.{
-  getValueAndRemove,
-  AbsolutePath,
-  Path,
-  RelativePath
+import org.apache.linkis.engineplugin.spark.config.SparkConfiguration.{
+  ENGINE_JAR,
+  SPARK_APP_NAME,
+  SPARK_DEFAULT_EXTERNAL_JARS_PATH,
+  SPARK_DRIVER_CLASSPATH,
+  SPARK_DRIVER_EXTRA_JAVA_OPTIONS,
+  SPARK_PYTHON_VERSION,
+  SPARK_SUBMIT_PATH
 }
+import org.apache.linkis.engineplugin.spark.config.SparkResourceConfiguration._
 import org.apache.linkis.hadoop.common.conf.HadoopConf
-import 
org.apache.linkis.manager.common.entity.resource.{DriverAndYarnResource, 
NodeResource}
+import org.apache.linkis.manager.common.entity.resource.DriverAndYarnResource
 import org.apache.linkis.manager.engineplugin.common.conf.EnvConfiguration
 import 
org.apache.linkis.manager.engineplugin.common.launch.entity.EngineConnBuildRequest
 import 
org.apache.linkis.manager.engineplugin.common.launch.process.Environment._
 import 
org.apache.linkis.manager.engineplugin.common.launch.process.JavaProcessEngineConnLaunchBuilder
-import org.apache.linkis.manager.label.entity.Label
 import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel
 import org.apache.linkis.protocol.UserWithCreator
 
 import org.apache.commons.lang3.StringUtils
 
-import java.lang.ProcessBuilder.Redirect
-import java.util
-
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
-import com.google.common.collect.Lists
-
-class SparkSubmitProcessEngineConnLaunchBuilder private extends 
JavaProcessEngineConnLaunchBuilder {
-
-  private[this] val fsRoot = "hdfs://"
-  protected var port: Int = _
-  protected var request: EngineConnBuildRequest = _
-  protected var userEngineResource: NodeResource = _
-  private[this] var _executable: Path = _
-  private[this] var _master: Option[String] = None
-  private[this] var _deployMode: Option[String] = None
-  private[this] var _className: Option[String] = None
-  private[this] var _name: Option[String] = None
-  private[this] var _jars: ArrayBuffer[Path] = ArrayBuffer()
-  private[this] var _pyFiles: ArrayBuffer[Path] = ArrayBuffer()
-  private[this] var _files: ArrayBuffer[Path] = ArrayBuffer()
-  private[this] var _conf: ArrayBuffer[(String, String)] = ArrayBuffer()
-  private[this] var _driverMemory: Option[String] = None
-  private[this] var _driverJavaOptions: Option[String] = None
-  private[this] var _driverClassPath: ArrayBuffer[String] = ArrayBuffer()
-  private[this] var _executorMemory: Option[String] = None
-  private[this] var _proxyUser: Option[String] = None
-
-  private[this] var _driverCores: Option[String] = None
-  private[this] var _executorCores: Option[String] = None
-  private[this] var _queue: Option[String] = None
-  private[this] var _numExecutors: Option[String] = None
-  private[this] var _archives: ArrayBuffer[Path] = ArrayBuffer()
-
-  private[this] var _env: ArrayBuffer[(String, String)] = ArrayBuffer()
-  private[this] var _redirectOutput: Option[ProcessBuilder.Redirect] = None
-  private[this] var _redirectError: Option[ProcessBuilder.Redirect] = None
-  private[this] var _redirectErrorStream: Option[Boolean] = None
-
-  private[this] var _userWithCreator: UserWithCreator =
-    UserWithCreator("DefaultUser", "DefaultCreator")
-
-  private[this] var _labels: ArrayBuffer[Label[_]] = ArrayBuffer()
-
-  def executable(executable: Path): SparkSubmitProcessEngineConnLaunchBuilder 
= {
-    _executable = executable
-    this
-  }
-
-  def jars(jars: Traversable[Path]): SparkSubmitProcessEngineConnLaunchBuilder 
= {
-    this._jars ++= jars
-    this
-  }
-
-  def pyFile(pyFile: Path): SparkSubmitProcessEngineConnLaunchBuilder = {
-    this._pyFiles += pyFile
-    this
-  }
-
-  def pyFiles(pyFiles: Traversable[Path]): 
SparkSubmitProcessEngineConnLaunchBuilder = {
-    this._pyFiles ++= pyFiles
-    this
-  }
-
-  def files(files: Traversable[Path]): 
SparkSubmitProcessEngineConnLaunchBuilder = {
-    this._files ++= files
-    this
-  }
+class SparkSubmitProcessEngineConnLaunchBuilder(builder: 
JavaProcessEngineConnLaunchBuilder) {
 
-  def conf(conf: Traversable[(String, String)]): 
SparkSubmitProcessEngineConnLaunchBuilder = {
-    this._conf ++= conf
-    this
-  }
-
-  def conf(conf: (String, String)): SparkSubmitProcessEngineConnLaunchBuilder =
-    this.conf(conf._1, conf._2)
-
-  def driverJavaOptions(driverJavaOptions: String): 
SparkSubmitProcessEngineConnLaunchBuilder = {
-    _driverJavaOptions = Some(driverJavaOptions)
-    this
-  }
-
-  def driverClassPaths(
-      classPaths: Traversable[String]
-  ): SparkSubmitProcessEngineConnLaunchBuilder = {
-    _driverClassPath ++= classPaths
-    this
-  }
+  def getCommands(
+      engineConnBuildRequest: EngineConnBuildRequest,
+      mainClass: String,
+      gcLogDir: String,
+      logDir: String
+  ): Array[String] = {
+    val userEngineResource = engineConnBuildRequest.engineResource
+    val darResource = 
userEngineResource.getLockedResource.asInstanceOf[DriverAndYarnResource]
+    val properties = engineConnBuildRequest.engineConnCreationDesc.properties
 
-  def archives(archives: Traversable[Path]): 
SparkSubmitProcessEngineConnLaunchBuilder = {
-    archives.foreach(archive)
-    this
-  }
+    val className = getValueAndRemove(properties, "className", mainClass)
+    val driverCores = getValueAndRemove(properties, LINKIS_SPARK_DRIVER_CORES)
+    val driverMemory = getValueAndRemove(properties, 
LINKIS_SPARK_DRIVER_MEMORY)
+    val executorCores = getValueAndRemove(properties, 
LINKIS_SPARK_EXECUTOR_CORES)
+    val executorMemory = getValueAndRemove(properties, 
LINKIS_SPARK_EXECUTOR_MEMORY)
+    val numExecutors = getValueAndRemove(properties, 
LINKIS_SPARK_EXECUTOR_INSTANCES)
+
+    val files = getValueAndRemove(properties, "files", 
"").split(",").filter(isNotBlankPath)
+    val jars = new ArrayBuffer[String]()
+    jars ++= getValueAndRemove(properties, "jars", 
"").split(",").filter(isNotBlankPath)
+    jars ++= getValueAndRemove(properties, SPARK_DEFAULT_EXTERNAL_JARS_PATH)
+      .split(",")
+      .filter(x => {
+        isNotBlankPath(x) && (new java.io.File(x)).isFile
+      })
+    val pyFiles = getValueAndRemove(properties, "py-files", 
"").split(",").filter(isNotBlankPath)
+    val archives = getValueAndRemove(properties, "archives", 
"").split(",").filter(isNotBlankPath)
 
-  def archive(archive: Path): SparkSubmitProcessEngineConnLaunchBuilder = {
-    _archives += archive
-    this
-  }
+    val queue = if (null != darResource) {
+      darResource.yarnResource.queueName
+    } else {
+      "default"
+    }
 
-  def redirectError(
-      redirect: ProcessBuilder.Redirect
-  ): SparkSubmitProcessEngineConnLaunchBuilder = {
-    _redirectError = Some(redirect)
-    this
-  }
+    val driverClassPath =
+      Array(getValueAndRemove(properties, SPARK_DRIVER_CLASSPATH), 
variable(CLASSPATH))
 
-  def setPort(port: Int): Unit = this.port = port
+    var userWithCreator: UserWithCreator = UserWithCreator("DefaultUser", 
"DefaultCreator")
+    engineConnBuildRequest.labels.asScala.foreach {
+      case label: UserCreatorLabel =>
+        userWithCreator = UserWithCreator(label.getUser, label.getCreator)
+      case _ =>
+    }
+    val appName = getValueAndRemove(properties, SPARK_APP_NAME) + "_" + 
userWithCreator.creator
 
-  override protected def getCommands(implicit
-      engineConnBuildRequest: EngineConnBuildRequest
-  ): Array[String] = {
-    build(engineConnBuildRequest)
     val commandLine: ArrayBuffer[String] = ArrayBuffer[String]()
-    commandLine += SparkConfiguration.SPARK_SUBMIT_PATH.getValue
-
-    def addOpt(option: String, value: Option[String]): Unit = {
-      value.foreach { v =>
-        commandLine += option
-        commandLine += v
-      }
-    }
+    commandLine += SPARK_SUBMIT_PATH.getValue
 
-    def addList(option: String, values: Traversable[String]): Unit = {
-      if (values.nonEmpty) {
+    def addOpt(option: String, value: String): Unit = {
+      if (StringUtils.isNotBlank(value)) {
         commandLine += option
-        commandLine += values.mkString(",")
+        commandLine += value
       }
     }
 
-    def addClasspath(option: String, values: Traversable[String]): Unit = {
-      if (values.nonEmpty) {
-        commandLine += option
-        commandLine += values.mkString(":")
+    def addProxyUser(): Unit = {
+      if (!HadoopConf.KEYTAB_PROXYUSER_ENABLED.getValue) return
+      val proxyUser = getValueAndRemove(properties, "proxyUser", "")
+      if (StringUtils.isNotBlank(proxyUser)) {
+        addOpt("--proxy-user", proxyUser)
+      } else {
+        addOpt("--proxy-user", userWithCreator.user)
       }
     }
 
-    addOpt("--master", _master)
-    addOpt("--deploy-mode", _deployMode)
-    addOpt("--name", _name)
-
-    if (HadoopConf.KEYTAB_PROXYUSER_ENABLED.getValue && _proxyUser.nonEmpty) {
-      addOpt("--proxy-user", _proxyUser)
+    def getMemory(memory: String): String = if (StringUtils.isNumeric(memory)) 
{
+      memory + "g"
+    } else {
+      memory
     }
 
-    // addOpt("--jars",Some(ENGINEMANAGER_JAR.getValue))
-    // info("No need to add jars for " + _jars.map(fromPath).exists(x => 
x.equals("hdfs:///")).toString())
-    _jars = _jars.filter(_.isNotBlankPath())
-
-    if (_jars.isEmpty) {
-      _jars += AbsolutePath("")
-    }
-    _jars += AbsolutePath(variable(UDF_JARS))
-    if (_jars.nonEmpty) {
-      addList("--jars", _jars.map(fromPath))
-    }
+    addOpt("--master", "yarn")
+    addOpt("--deploy-mode", "client")
+    addOpt("--name", appName)
+    addProxyUser()
 
-    _pyFiles = _pyFiles.filter(_.isNotBlankPath())
-    if (_pyFiles.nonEmpty) {
-      addList("--py-files", _pyFiles.map(fromPath))
+    if (jars.isEmpty) {
+      jars += ""
     }
+    jars += variable(UDF_JARS)
 
-    _files = _files.filter(_.isNotBlankPath())
-    if (_files.nonEmpty) {
-      addList("--files", _files.map(fromPath))
-    }
+    addOpt("--jars", jars.mkString(","))
+    addOpt("--py-files", pyFiles.mkString(","))
+    addOpt("--files", files.mkString(","))
+    addOpt("--archives", archives.mkString(","))
+    addOpt("--driver-class-path", driverClassPath.mkString(":"))
+    addOpt("--driver-memory", getMemory(driverMemory))
+    addOpt("--driver-cores", driverCores.toString)
+    addOpt("--executor-memory", getMemory(executorMemory))
+    addOpt("--executor-cores", executorCores.toString)
+    addOpt("--num-executors", numExecutors.toString)
+    addOpt("--queue", queue)
 
-    _archives = _archives.filter(_.isNotBlankPath())
-    if (_archives.nonEmpty) {
-      addList("--archives", _archives.map(fromPath))
-    }
-    _conf.foreach { case (key, value) =>
-      if (key.startsWith("spark.")) {
-        // subcommand cannot be quoted by double quote, use single quote 
instead
-        addOpt("--conf", Some(key + "=\"" + value + "\""))
-      }
+    getConf(engineConnBuildRequest, gcLogDir, logDir).foreach { case (key, 
value) =>
+      addOpt("--conf", s"""$key="$value"""")
     }
-    addOpt("--driver-memory", _driverMemory)
-    addClasspath("--driver-class-path", _driverClassPath)
-    addOpt("--driver-cores", _driverCores)
-    addOpt("--executor-memory", _executorMemory)
-    addOpt("--executor-cores", _executorCores)
-    addOpt("--num-executors", _numExecutors)
-    addOpt("--queue", _queue)
-
-    addOpt("--class", _className)
-    addOpt("1>", Some(s"${variable(LOG_DIRS)}/stdout"))
-    addOpt("2>>", Some(s"${variable(LOG_DIRS)}/stderr"))
 
-    addOpt("", Some(s" 
${variable(PWD)}/lib/${SparkConfiguration.ENGINE_JAR.getValue}"))
+    addOpt("--class", className)
+    addOpt("1>", s"${variable(LOG_DIRS)}/stdout")
+    addOpt("2>>", s"${variable(LOG_DIRS)}/stderr")
+    addOpt("", s" ${variable(PWD)}/lib/${ENGINE_JAR.getValue}")
 
     commandLine.toArray.filter(StringUtils.isNotEmpty)
   }
 
-  override def enablePublicModule = true
-
-  def master(masterUrl: String): SparkSubmitProcessEngineConnLaunchBuilder = {
-    _master = Some(masterUrl)
-    this
-  }
-
-  def deployMode(deployMode: String): 
SparkSubmitProcessEngineConnLaunchBuilder = {
-    _deployMode = Some(deployMode)
-    this
-  }
-
-  def className(className: String): SparkSubmitProcessEngineConnLaunchBuilder 
= {
-    _className = Some(className)
-    this
-  }
-
-  def name(name: String): SparkSubmitProcessEngineConnLaunchBuilder = {
-    _name = Some(name)
-    this
-  }
-
-  def jar(jar: Path): SparkSubmitProcessEngineConnLaunchBuilder = {
-    this._jars += jar
-    this
-  }
-
-  def file(file: Path): SparkSubmitProcessEngineConnLaunchBuilder = {
-    this._files += file
-    this
-  }
-
-  def conf(key: String, value: String): 
SparkSubmitProcessEngineConnLaunchBuilder = {
-    this._conf += ((key, value))
-    this
-  }
-
-  def driverMemory(driverMemory: String): 
SparkSubmitProcessEngineConnLaunchBuilder = {
-    _driverMemory = Some(driverMemory)
-    this
-  }
-
-  def driverClassPath(classPath: String): 
SparkSubmitProcessEngineConnLaunchBuilder = {
-    _driverClassPath += classPath
-    this
-  }
-
-  def executorMemory(executorMemory: String): 
SparkSubmitProcessEngineConnLaunchBuilder = {
-    _executorMemory = Some(executorMemory)
-    this
-  }
-
-  def proxyUser(proxyUser: String): SparkSubmitProcessEngineConnLaunchBuilder 
= {
-    _proxyUser = Some(proxyUser)
-    this
-  }
-
-  def driverCores(driverCores: Int): SparkSubmitProcessEngineConnLaunchBuilder 
= {
-    this.driverCores(driverCores.toString)
-  }
-
-  def driverCores(driverCores: String): 
SparkSubmitProcessEngineConnLaunchBuilder = {
-    _driverCores = Some(driverCores)
-    this
-  }
-
-  def executorCores(executorCores: Int): 
SparkSubmitProcessEngineConnLaunchBuilder = {
-    this.executorCores(executorCores.toString)
-  }
-
-  def executorCores(executorCores: String): 
SparkSubmitProcessEngineConnLaunchBuilder = {
-    _executorCores = Some(executorCores)
-    this
-  }
-
-  def numExecutors(numExecutors: Int): 
SparkSubmitProcessEngineConnLaunchBuilder = {
-    this.numExecutors(numExecutors.toString)
-  }
-
-  def numExecutors(numExecutors: String): 
SparkSubmitProcessEngineConnLaunchBuilder = {
-    _numExecutors = Some(numExecutors)
-    this
-  }
-
-  def queue(queue: String): SparkSubmitProcessEngineConnLaunchBuilder = {
-    _queue = Some(queue)
-    this
-  }
-
-  def env(key: String, value: String): 
SparkSubmitProcessEngineConnLaunchBuilder = {
-    _env += ((key, value))
-    this
-  }
-
-  def redirectOutput(
-      redirect: ProcessBuilder.Redirect
-  ): SparkSubmitProcessEngineConnLaunchBuilder = {
-    _redirectOutput = Some(redirect)
-    this
-  }
-
-  def redirectErrorStream(redirect: Boolean): 
SparkSubmitProcessEngineConnLaunchBuilder = {
-    _redirectErrorStream = Some(redirect)
-    this
-  }
-
-  def getEngineResource: NodeResource = userEngineResource
-
-  def build(engineRequest: EngineConnBuildRequest): Unit = {
-    this.request = engineRequest
-    this.userEngineResource = engineRequest.engineResource
-    val darResource: DriverAndYarnResource =
-      userEngineResource.getLockedResource.asInstanceOf[DriverAndYarnResource]
-    val properties = engineRequest.engineConnCreationDesc.properties
-    this.master("yarn")
-    this.deployMode("client")
+  def getConf(
+      engineConnBuildRequest: EngineConnBuildRequest,
+      gcLogDir: String,
+      logDir: String
+  ): ArrayBuffer[(String, String)] = {
     val driverJavaSet = new StringBuilder(" -server")
-
     if 
(StringUtils.isNotEmpty(EnvConfiguration.ENGINE_CONN_DEFAULT_JAVA_OPTS.getValue))
 {
       EnvConfiguration.ENGINE_CONN_DEFAULT_JAVA_OPTS.getValue
-        .format(getGcLogDir(engineRequest))
+        .format(gcLogDir)
         .split("\\s+")
         .foreach(l => {
           driverJavaSet.append(" ").append(l)
         })
     }
-    getLogDir(engineRequest).trim
+    logDir.trim
       .split(" ")
       .foreach(l => {
         driverJavaSet.append(" ").append(l)
@@ -369,142 +174,35 @@ class SparkSubmitProcessEngineConnLaunchBuilder private 
extends JavaProcessEngin
         s" 
-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=${variable(RANDOM_PORT)}"
       )
     }
-    this.conf(SparkConfiguration.SPARK_DRIVER_EXTRA_JAVA_OPTIONS.key, 
driverJavaSet.toString())
-    // this.conf("spark.sql.extensions", 
"org.apache.linkis.hook.spark.extension.SparkHistoryExtension")
-    this.className(getValueAndRemove(properties, "className", getMainClass))
-
-    getValueAndRemove(properties, "archives", "").toString
-      .split(",")
-      .map(AbsolutePath)
-      .foreach(this.archive)
-    this.driverCores(
-      getValueAndRemove(properties, 
SparkResourceConfiguration.LINKIS_SPARK_DRIVER_CORES)
-    )
-    val driverMemory = getValueAndRemove(properties, 
LINKIS_SPARK_DRIVER_MEMORY)
-    val driverMemoryWithUnit = if (StringUtils.isNumeric(driverMemory)) {
-      driverMemory + "g"
-    } else {
-      driverMemory
-    }
-    this.driverMemory(driverMemoryWithUnit)
-    this.executorCores(
-      getValueAndRemove(properties, 
SparkResourceConfiguration.LINKIS_SPARK_EXECUTOR_CORES)
-    )
-    val executorMemory =
-      getValueAndRemove(properties, 
SparkResourceConfiguration.LINKIS_SPARK_EXECUTOR_MEMORY)
-    val executorMemoryWithUnit = if (StringUtils.isNumeric(executorMemory)) {
-      executorMemory + "g"
-    } else {
-      executorMemory
-    }
-    this.executorMemory(executorMemoryWithUnit)
-    this.numExecutors(
-      getValueAndRemove(properties, 
SparkResourceConfiguration.LINKIS_SPARK_EXECUTOR_INSTANCES)
-    )
-    getValueAndRemove(properties, "files", 
"").split(",").map(AbsolutePath).foreach(file)
-    getValueAndRemove(properties, "jars", 
"").split(",").map(AbsolutePath).foreach(jar)
-    val defaultExternalJars =
-      getValueAndRemove(properties, 
SparkConfiguration.SPARK_DEFAULT_EXTERNAL_JARS_PATH)
-    defaultExternalJars
-      .split(",")
-      .map(AbsolutePath)
-      .filter(x => {
-        val file = new java.io.File(x.path)
-        file.isFile
-      })
-      .foreach(jar)
-
-    proxyUser(getValueAndRemove(properties, "proxyUser", ""))
-    if (null != darResource) {
-      this.queue(darResource.yarnResource.queueName)
-    } else {
-      this.queue("default")
-    }
-
-    this.driverClassPath(getValueAndRemove(properties, 
SparkConfiguration.SPARK_DRIVER_CLASSPATH))
-    this.driverClassPath(variable(CLASSPATH))
-    this.redirectOutput(Redirect.PIPE)
-    this.redirectErrorStream(true)
-
-    val labels = engineRequest.labels.asScala
-    labels.foreach { l =>
-      {
-        this._labels += l
-        l match {
-          case label: UserCreatorLabel =>
-            this._userWithCreator = UserWithCreator(label.getUser, 
label.getCreator)
-          case _ =>
-        }
-      }
-    }
 
-    if (!HadoopConf.KEYTAB_PROXYUSER_ENABLED.getValue) {
-      this.proxyUser(getValueAndRemove(properties, "proxyUser", ""))
-    } else {
-      this.proxyUser(this._userWithCreator.user)
-    }
+    val conf: ArrayBuffer[(String, String)] = ArrayBuffer()
+    conf += (SPARK_DRIVER_EXTRA_JAVA_OPTIONS.key -> driverJavaSet.toString())
 
     // deal spark conf and spark.hadoop.*
+    val properties = engineConnBuildRequest.engineConnCreationDesc.properties
     val iterator = properties.entrySet().iterator()
     val sparkConfKeys = ArrayBuffer[String]()
     while (iterator.hasNext) {
       val keyValue = iterator.next()
       if (
-          !SparkConfiguration.SPARK_PYTHON_VERSION.key.equals(keyValue.getKey) 
&& keyValue.getKey
-            .startsWith("spark.") && StringUtils.isNotBlank(keyValue.getValue)
+          !SPARK_PYTHON_VERSION.key.equals(keyValue.getKey) &&
+          keyValue.getKey.startsWith("spark.") &&
+          StringUtils.isNotBlank(keyValue.getValue)
       ) {
-        conf(keyValue.getKey, keyValue.getValue)
+        conf += (keyValue.getKey -> keyValue.getValue)
         sparkConfKeys += keyValue.getKey
       }
     }
-    this.name(
-      getValueAndRemove(
-        properties,
-        SparkConfiguration.SPARK_APP_NAME
-      ) + "_" + this._userWithCreator.creator
-    )
     sparkConfKeys.foreach(properties.remove(_))
+    conf
   }
 
-  private def fromPath(path: Path): String = path match {
-    case AbsolutePath(p) => p
-    case RelativePath(p) => p
-  }
-
-  override protected def getEngineConnManagerHooks(implicit
-      engineConnBuildRequest: EngineConnBuildRequest
-  ): util.List[String] = {
-    Lists.newArrayList("JarUDFLoadECMHook")
-  }
-
-}
-
-object SparkSubmitProcessEngineConnLaunchBuilder {
-
-  def newBuilder(): SparkSubmitProcessEngineConnLaunchBuilder =
-    new SparkSubmitProcessEngineConnLaunchBuilder
-
-  sealed trait Path {
-
-    def isNotBlankPath(): Boolean;
-
-    protected def isNotBlankPath(path: String): Boolean = {
-      StringUtils.isNotBlank(path) && !"/".equals(path.trim) && 
!"hdfs:///".equals(
-        path.trim
-      ) && !"file:///".equals(path.trim)
-    }
-
-  }
-
-  case class AbsolutePath(path: String) extends Path {
-    override def isNotBlankPath(): Boolean = isNotBlankPath(path)
-  }
-
-  case class RelativePath(path: String) extends Path {
-    override def isNotBlankPath(): Boolean = isNotBlankPath(path)
+  private def isNotBlankPath(path: String): Boolean = {
+    StringUtils.isNotBlank(path) && !"/".equals(path.trim) &&
+    !"hdfs:///".equals(path.trim) && !"file:///".equals(path.trim)
   }
 
-  def getValueAndRemove[T](
+  private def getValueAndRemove[T](
       properties: java.util.Map[String, String],
       commonVars: CommonVars[T]
   ): T = {
@@ -513,7 +211,7 @@ object SparkSubmitProcessEngineConnLaunchBuilder {
     value
   }
 
-  def getValueAndRemove(
+  private def getValueAndRemove(
       properties: java.util.Map[String, String],
       key: String,
       defaultValue: String
diff --git 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/utils/SparkJobProgressUtil.scala
 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/utils/SparkJobProgressUtil.scala
new file mode 100644
index 000000000..196414420
--- /dev/null
+++ 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/utils/SparkJobProgressUtil.scala
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.utils
+
+import org.apache.linkis.common.utils.{JsonUtils, Logging}
+import org.apache.linkis.engineconn.launch.EngineConnServer
+import 
org.apache.linkis.engineplugin.spark.config.SparkConfiguration.SPARK_ONCE_YARN_RESTFUL_URL
+import org.apache.linkis.protocol.engine.JobProgressInfo
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.http.client.methods.HttpGet
+import org.apache.http.impl.client.HttpClients
+import org.apache.http.util.EntityUtils
+
+object SparkJobProgressUtil extends Logging {
+
+  def getProgress(applicationId: String): Float = {
+    if (StringUtils.isBlank(applicationId)) return 0f
+    val sparkJobsResult = getSparkJobInfo(applicationId)
+    if (sparkJobsResult.isEmpty) return 0f
+    val tuple = sparkJobsResult
+      .filter(sparkJobResult => {
+        val status = sparkJobResult.getOrDefault("status", 
"").asInstanceOf[String]
+        "RUNNING".equals(status) || "SUCCEEDED".equals(status)
+      })
+      .map(result =>
+        (
+          result.get("numTasks").asInstanceOf[Integer],
+          result.get("numCompletedTasks").asInstanceOf[Integer]
+        )
+      )
+      .reduce((total, completed) => (total._1 + completed._1, total._2 + 
completed._2))
+    tuple._2.toFloat / tuple._1
+  }
+
+  def getSparkJobProgressInfo(applicationId: String): Array[JobProgressInfo] = 
{
+    val sparkJobsResult = getSparkJobInfo(applicationId)
+    if (sparkJobsResult.isEmpty) {
+      Array.empty
+    } else {
+      sparkJobsResult.map(sparkJobResult =>
+        JobProgressInfo(
+          getJobId(
+            sparkJobResult.get("jobId").asInstanceOf[Integer],
+            sparkJobResult.get("jobGroup").asInstanceOf[String]
+          ),
+          sparkJobResult.get("numTasks").asInstanceOf[Integer],
+          sparkJobResult.get("numActiveTasks").asInstanceOf[Integer],
+          sparkJobResult.get("numFailedTasks").asInstanceOf[Integer],
+          sparkJobResult.get("numCompletedTasks").asInstanceOf[Integer]
+        )
+      )
+    }
+  }
+
+  def getSparkJobInfo(applicationId: String): Array[java.util.Map[String, 
Object]] =
+    if (StringUtils.isBlank(applicationId)) Array.empty
+    else {
+      val yarnRestfulUrl =
+        
SPARK_ONCE_YARN_RESTFUL_URL.getValue(EngineConnServer.getEngineCreationContext.getOptions)
+      val getAppUrl = s"$yarnRestfulUrl/ws/v1/cluster/apps/$applicationId"
+      logger.info(s"get yarn app, url: $getAppUrl")
+      val appResult =
+        JsonUtils.jackson.readValue(get(getAppUrl), 
classOf[java.util.Map[String, Object]])
+      val app = appResult.get("app").asInstanceOf[java.util.Map[String, 
Object]]
+      if (app == null) return Array.empty
+      val trackingUrl = app.getOrDefault("trackingUrl", 
"").asInstanceOf[String]
+      val state = app.getOrDefault("state", "").asInstanceOf[String]
+      if (StringUtils.isBlank(trackingUrl) || "FINISHED".equals(state)) {
+        return Array.empty
+      }
+      val getSparkJobsUrl = 
s"${trackingUrl}api/v1/applications/$applicationId/jobs"
+      logger.info(s"get spark jobs, url: $getSparkJobsUrl")
+      val jobs = get(getSparkJobsUrl)
+      if (StringUtils.isBlank(jobs)) {
+        return Array.empty
+      }
+      JsonUtils.jackson.readValue(
+        get(getSparkJobsUrl),
+        classOf[Array[java.util.Map[String, Object]]]
+      )
+    }
+
+  def get(url: String): String = {
+    val httpGet = new HttpGet(url)
+    val client = HttpClients.createDefault
+    val response = client.execute(httpGet)
+    if (response.getStatusLine.getStatusCode == 200) {
+      val str = EntityUtils.toString(response.getEntity, "UTF-8")
+      logger.info(s"url: $url")
+      logger.info(s"response: $str")
+      str
+    } else {
+      ""
+    }
+  }
+
+  private def getJobId(jobId: Int, jobGroup: String): String =
+    "jobId-" + jobId + "(" + jobGroup + ")"
+
+}
diff --git 
a/linkis-engineconn-plugins/spark/src/test/scala/org/apache/linkis/engineplugin/spark/launch/TestSparkSubmitProcessEngineConnLaunchBuilder.scala
 
b/linkis-engineconn-plugins/spark/src/test/scala/org/apache/linkis/engineplugin/spark/launch/TestSparkSubmitProcessEngineConnLaunchBuilder.scala
index 3cb76869b..1c9d45b1e 100644
--- 
a/linkis-engineconn-plugins/spark/src/test/scala/org/apache/linkis/engineplugin/spark/launch/TestSparkSubmitProcessEngineConnLaunchBuilder.scala
+++ 
b/linkis-engineconn-plugins/spark/src/test/scala/org/apache/linkis/engineplugin/spark/launch/TestSparkSubmitProcessEngineConnLaunchBuilder.scala
@@ -17,21 +17,13 @@
 
 package org.apache.linkis.engineplugin.spark.launch
 
-import 
org.apache.linkis.engineplugin.spark.launch.SparkSubmitProcessEngineConnLaunchBuilder.RelativePath
-
 import org.junit.jupiter.api.{Assertions, Test}
 
 class TestSparkSubmitProcessEngineConnLaunchBuilder {
 
   @Test
   def testCreateContext: Unit = {
-    val ddd = SparkSubmitProcessEngineConnLaunchBuilder
-      .newBuilder()
-      .master("local[1]")
-      .deployMode("client")
-      .className("org.apache.linkis.engineplugin.spark.launch")
-      .driverClassPath("")
-      .archive(RelativePath(""))
+    val ddd = new SparkEngineConnLaunchBuilder()
     Assertions.assertNotNull(ddd)
   }
 
diff --git 
a/linkis-engineconn-plugins/sqoop/src/main/scala/org/apache/linkis/engineconnplugin/sqoop/SqoopEngineConnPlugin.scala
 
b/linkis-engineconn-plugins/sqoop/src/main/scala/org/apache/linkis/engineconnplugin/sqoop/SqoopEngineConnPlugin.scala
index 90fc3acd9..5aea274df 100644
--- 
a/linkis-engineconn-plugins/sqoop/src/main/scala/org/apache/linkis/engineconnplugin/sqoop/SqoopEngineConnPlugin.scala
+++ 
b/linkis-engineconn-plugins/sqoop/src/main/scala/org/apache/linkis/engineconnplugin/sqoop/SqoopEngineConnPlugin.scala
@@ -46,12 +46,7 @@ class SqoopEngineConnPlugin extends EngineConnPlugin {
   }
 
   override def getEngineConnLaunchBuilder: EngineConnLaunchBuilder = {
-    EP_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
-      if (null == engineConnLaunchBuilder) {
-        engineConnLaunchBuilder = new SqoopEngineConnLaunchBuilder()
-      }
-      engineConnLaunchBuilder
-    }
+    new SqoopEngineConnLaunchBuilder()
   }
 
   override def getEngineConnFactory: EngineConnFactory = {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to