This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/linkis.git


The following commit(s) were added to refs/heads/master by this push:
     new bca9fe22ff Add a containerized mode to the ECM service (#5201)
bca9fe22ff is described below

commit bca9fe22ffa3f5bfaedfd18ed631ecf6d23e6c30
Author: LiuGuoHua <[email protected]>
AuthorDate: Wed Nov 27 16:26:34 2024 +0800

    Add a containerized mode to the ECM service (#5201)
    
    * Add a containerized mode to the ECM service, which allows assigning 
specific IPs and ports for communication with the outside world to particular 
engines in this mode. For instance, a Spark engine requires at least two ports: 
spark.driver.port and spark.driver.blockManager.port.
    
    * Supplement the SQL for database modification
    
    * remove code that may cause compilation to fail
    
    * improve the linkis_ddl.sql in configmap-init-sql.yaml
    
    * Add all parameters from SparkConf.getConf to sparkLauncher.
    
    * Add a method addAllConf to SparkConf.
    
    * formatted code
    
    * Fix the missing ticketId field in the linkis_cg_manager_service_instance 
within the configmap-init-sql.yaml file.
    
    * Fix the missing observe_info field in the 
linkis_ps_job_history_group_history within the configmap-init-sql.yaml file.
    
    * print lm log
    
    * add debug log
    
    * Add fields(mapping_host, mapping_ports) to the 
linkis_cg_manager_service_instance for ddl file.
    
    * remove debug action
    
    ---------
    
    Co-authored-by: peacewong <[email protected]>
---
 .github/workflows/integration-test.yml             |  5 +-
 .../org/apache/linkis/common/ServiceInstance.scala | 20 ++++++
 .../enums/MappingPortStrategyName.java             | 39 ++++++++++
 .../strategy/MappingPortContext.java               | 32 +++++++++
 .../strategy/MappingPortStrategy.java              | 24 +++++++
 .../strategy/StaticMappingPortStrategy.java        | 79 +++++++++++++++++++++
 .../ecm/core/conf/ContainerizationConf.scala       | 44 ++++++++++++
 .../ecm/core/launch/ProcessEngineConnLaunch.scala  | 45 ++++++++++++
 .../impl/AbstractEngineConnLaunchService.scala     |  4 +-
 .../linkis/manager/am/vo/AMEngineNodeVo.java       | 20 ++++++
 .../apache/linkis/manager/am/utils/AMUtils.scala   |  2 +
 .../common/entity/persistence/PersistenceNode.java | 19 +++++
 .../impl/DefaultNodeManagerPersistence.java        |  8 +++
 .../resources/mapper/common/NodeManagerMapper.xml  |  6 ++
 .../linkis/templates/configmap-init-sql.yaml       |  4 ++
 linkis-dist/package/db/linkis_ddl.sql              |  2 +
 linkis-dist/package/db/linkis_ddl_pg.sql           |  2 +
 .../db/upgrade/1.7.0_schema/mysql/linkis_ddl.sql   |  4 ++
 .../spark/client/context/SparkConfig.java          |  8 +++
 .../YarnApplicationClusterDescriptorAdapter.java   |  3 +
 .../hooks/SparkContainerizationEngineConnHook.java | 82 ++++++++++++++++++++++
 .../main/resources/linkis-engineconn.properties    |  2 +-
 .../spark/config/SparkConfiguration.scala          | 14 ++++
 .../spark/factory/SparkEngineConnFactory.scala     | 37 ++++++++++
 24 files changed, 500 insertions(+), 5 deletions(-)

diff --git a/.github/workflows/integration-test.yml 
b/.github/workflows/integration-test.yml
index 85d3b9a23e..6e4e5097a2 100644
--- a/.github/workflows/integration-test.yml
+++ b/.github/workflows/integration-test.yml
@@ -166,7 +166,7 @@ jobs:
           # Show port-forward list
           bash ./linkis-dist/helm/scripts/remote-proxy.sh list
           # Check if the web service is available
-          curl http://127.0.0.1:8088/indexhtml
+          curl http://127.0.0.1:8088/
 
           # Execute test by linkis-cli
           POD_NAME=`kubectl get pods -n linkis -l 
app.kubernetes.io/instance=linkis-demo-mg-gateway -o 
jsonpath='{.items[0].metadata.name}'`
@@ -182,5 +182,4 @@ jobs:
 
           #kubectl exec -it -n linkis  ${POD_NAME} -- bash -c " \
           #sh /opt/linkis/bin/linkis-cli -engineType spark-3.2.1 -codeType sql 
-code   'show databases' "
-
-        shell: bash
\ No newline at end of file
+        shell: bash
diff --git 
a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/ServiceInstance.scala
 
b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/ServiceInstance.scala
index f9e4718472..08974df281 100644
--- 
a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/ServiceInstance.scala
+++ 
b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/ServiceInstance.scala
@@ -21,11 +21,19 @@ class ServiceInstance {
   private var applicationName: String = _
   private var instance: String = _
   private var registryTimestamp: Long = _
+  private var mappingPorts: String = _
+  private var mappingHost: String = _
   def setApplicationName(applicationName: String): Unit = this.applicationName 
= applicationName
   def getApplicationName: String = applicationName
   def setInstance(instance: String): Unit = this.instance = instance
   def getInstance: String = instance
 
+  def setMappingPorts(mappingPorts: String): Unit = this.mappingPorts = 
mappingPorts
+  def getMappingPorts: String = mappingPorts
+
+  def setMappingHost(mappingHost: String): Unit = this.mappingHost = 
mappingHost
+  def getMappingHost: String = mappingHost
+
   def setRegistryTimestamp(registryTimestamp: Long): Unit = 
this.registryTimestamp =
     registryTimestamp
 
@@ -62,6 +70,18 @@ object ServiceInstance {
     serviceInstance
   }
 
+  def apply(
+      applicationName: String,
+      instance: String,
+      mappingPorts: String,
+      mappingHost: String
+  ): ServiceInstance = {
+    val serviceInstance = apply(applicationName, instance)
+    serviceInstance.setMappingPorts(mappingPorts)
+    serviceInstance.setMappingHost(mappingHost)
+    serviceInstance
+  }
+
   def apply(applicationName: String, instance: String, registryTimestamp: 
Long): ServiceInstance = {
     val serviceInstance = new ServiceInstance
     serviceInstance.setApplicationName(applicationName)
diff --git 
a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-core/src/main/java/org/apache/linkis/ecm/core/containerization/enums/MappingPortStrategyName.java
 
b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-core/src/main/java/org/apache/linkis/ecm/core/containerization/enums/MappingPortStrategyName.java
new file mode 100644
index 0000000000..7d43fc9ae0
--- /dev/null
+++ 
b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-core/src/main/java/org/apache/linkis/ecm/core/containerization/enums/MappingPortStrategyName.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.ecm.core.containerization.enums;
+
+public enum MappingPortStrategyName {
+  STATIC("static");
+  private String name;
+
+  MappingPortStrategyName(String name) {
+    this.name = name;
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public void setName(String name) {
+    this.name = name;
+  }
+
+  public static MappingPortStrategyName toEnum(String name) {
+    return MappingPortStrategyName.valueOf(name.toUpperCase());
+  }
+}
diff --git 
a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-core/src/main/java/org/apache/linkis/ecm/core/containerization/strategy/MappingPortContext.java
 
b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-core/src/main/java/org/apache/linkis/ecm/core/containerization/strategy/MappingPortContext.java
new file mode 100644
index 0000000000..ef31b2562d
--- /dev/null
+++ 
b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-core/src/main/java/org/apache/linkis/ecm/core/containerization/strategy/MappingPortContext.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.ecm.core.containerization.strategy;
+
+import 
org.apache.linkis.ecm.core.containerization.enums.MappingPortStrategyName;
+
+public class MappingPortContext {
+
+  public static MappingPortStrategy getInstance(MappingPortStrategyName 
strategyName) {
+    switch (strategyName) {
+      case STATIC:
+        return new StaticMappingPortStrategy();
+      default:
+        return new StaticMappingPortStrategy();
+    }
+  }
+}
diff --git 
a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-core/src/main/java/org/apache/linkis/ecm/core/containerization/strategy/MappingPortStrategy.java
 
b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-core/src/main/java/org/apache/linkis/ecm/core/containerization/strategy/MappingPortStrategy.java
new file mode 100644
index 0000000000..21cf2ac3a0
--- /dev/null
+++ 
b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-core/src/main/java/org/apache/linkis/ecm/core/containerization/strategy/MappingPortStrategy.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.ecm.core.containerization.strategy;
+
+import java.io.IOException;
+
+public interface MappingPortStrategy {
+  int availablePort() throws IOException;
+}
diff --git 
a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-core/src/main/java/org/apache/linkis/ecm/core/containerization/strategy/StaticMappingPortStrategy.java
 
b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-core/src/main/java/org/apache/linkis/ecm/core/containerization/strategy/StaticMappingPortStrategy.java
new file mode 100644
index 0000000000..6ea627093f
--- /dev/null
+++ 
b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-core/src/main/java/org/apache/linkis/ecm/core/containerization/strategy/StaticMappingPortStrategy.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.ecm.core.containerization.strategy;
+
+import org.apache.linkis.ecm.core.conf.ContainerizationConf;
+
+import org.apache.commons.io.IOUtils;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class StaticMappingPortStrategy implements MappingPortStrategy {
+
+  private static final AtomicInteger currentIndex = new AtomicInteger(0);
+
+  @Override
+  public int availablePort() throws IOException {
+    return getNewPort(10);
+  }
+
+  public int getNewPort(int retryNum) throws IOException {
+    int[] portRange = getPortRange();
+    if (retryNum == 0) {
+      throw new IOException(
+          "No available port in the portRange: "
+              + 
ContainerizationConf.ENGINE_CONN_CONTAINERIZATION_MAPPING_STATTIC_PORT_RANGE()
+                  .getValue());
+    }
+    moveIndex();
+    int minPort = portRange[0];
+    int newPort = minPort + currentIndex.get() - 1;
+    ServerSocket socket = null;
+    try {
+      socket = new ServerSocket(newPort);
+    } catch (Exception e) {
+      return getNewPort(--retryNum);
+    } finally {
+      IOUtils.close(socket);
+    }
+    return newPort;
+  }
+
+  private synchronized void moveIndex() {
+    int poolSize = getPoolSize();
+    currentIndex.set(currentIndex.get() % poolSize + 1);
+  }
+
+  private int[] getPortRange() {
+    String portRange =
+        
ContainerizationConf.ENGINE_CONN_CONTAINERIZATION_MAPPING_STATTIC_PORT_RANGE().getValue();
+
+    return 
Arrays.stream(portRange.split("-")).mapToInt(Integer::parseInt).toArray();
+  }
+
+  private int getPoolSize() {
+    int[] portRange = getPortRange();
+    int minPort = portRange[0];
+    int maxPort = portRange[1];
+
+    return maxPort - minPort + 1;
+  }
+}
diff --git 
a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-core/src/main/scala/org/apache/linkis/ecm/core/conf/ContainerizationConf.scala
 
b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-core/src/main/scala/org/apache/linkis/ecm/core/conf/ContainerizationConf.scala
new file mode 100644
index 0000000000..a7461210f1
--- /dev/null
+++ 
b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-core/src/main/scala/org/apache/linkis/ecm/core/conf/ContainerizationConf.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.ecm.core.conf
+
+import org.apache.linkis.common.conf.CommonVars
+
+object ContainerizationConf {
+
+  val ENGINE_CONN_CONTAINERIZATION_MAPPING_STATTIC_PORT_RANGE =
+    CommonVars("linkis.engine.containerization.static.port.range", "1-65535")
+
+  val ENGINE_CONN_CONTAINERIZATION_ENABLE =
+    CommonVars("linkis.engine.containerization.enable", false).getValue
+
+  val ENGINE_CONN_CONTAINERIZATION_MAPPING_HOST =
+    CommonVars("linkis.engine.containerization.mapping.host", "")
+
+  val ENGINE_CONN_CONTAINERIZATION_MAPPING_PORTS =
+    CommonVars("linkis.engine.containerization.mapping.ports", "")
+
+  val ENGINE_CONN_CONTAINERIZATION_MAPPING_STRATEGY =
+    CommonVars("linkis.engine.containerization.mapping.strategy", "static")
+
+  // 引擎类型-需要开启的端口数量
+  // Engine Type - Number of Ports Required to Be Opened
+  val ENGINE_CONN_CONTAINERIZATION_ENGINE_LIST =
+    CommonVars("linkis.engine.containerization.engine.list", "spark-2,")
+
+}
diff --git 
a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-core/src/main/scala/org/apache/linkis/ecm/core/launch/ProcessEngineConnLaunch.scala
 
b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-core/src/main/scala/org/apache/linkis/ecm/core/launch/ProcessEngineConnLaunch.scala
index cc79e24d4f..4dfb96b3d5 100644
--- 
a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-core/src/main/scala/org/apache/linkis/ecm/core/launch/ProcessEngineConnLaunch.scala
+++ 
b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-core/src/main/scala/org/apache/linkis/ecm/core/launch/ProcessEngineConnLaunch.scala
@@ -20,6 +20,15 @@ package org.apache.linkis.ecm.core.launch
 import org.apache.linkis.common.conf.{CommonVars, Configuration}
 import org.apache.linkis.common.exception.ErrorException
 import org.apache.linkis.common.utils.{Logging, Utils}
+import org.apache.linkis.ecm.core.conf.ContainerizationConf.{
+  ENGINE_CONN_CONTAINERIZATION_ENABLE,
+  ENGINE_CONN_CONTAINERIZATION_ENGINE_LIST,
+  ENGINE_CONN_CONTAINERIZATION_MAPPING_HOST,
+  ENGINE_CONN_CONTAINERIZATION_MAPPING_PORTS,
+  ENGINE_CONN_CONTAINERIZATION_MAPPING_STRATEGY
+}
+import 
org.apache.linkis.ecm.core.containerization.enums.MappingPortStrategyName
+import org.apache.linkis.ecm.core.containerization.strategy.MappingPortContext
 import org.apache.linkis.ecm.core.errorcode.LinkisECMErrorCodeSummary._
 import org.apache.linkis.ecm.core.exception.ECMCoreException
 import org.apache.linkis.ecm.core.utils.PortUtils
@@ -35,6 +44,7 @@ import 
org.apache.linkis.manager.engineplugin.common.launch.process.{
 }
 import 
org.apache.linkis.manager.engineplugin.common.launch.process.Environment._
 import 
org.apache.linkis.manager.engineplugin.common.launch.process.LaunchConstants._
+import org.apache.linkis.manager.label.utils.LabelUtil
 
 import org.apache.commons.io.FileUtils
 import org.apache.commons.lang3.StringUtils
@@ -54,6 +64,9 @@ trait ProcessEngineConnLaunch extends EngineConnLaunch with 
Logging {
 
   private var engineConnPort: String = _
 
+  private var mappingPorts: String = ""
+  private var mappingHost: String = _
+
   protected def newProcessEngineConnCommandBuilder(): 
ProcessEngineCommandBuilder =
     new UnixProcessEngineCommandBuilder
 
@@ -142,6 +155,10 @@ trait ProcessEngineConnLaunch extends EngineConnLaunch 
with Logging {
 
   def getEngineConnPort: String = engineConnPort
 
+  def getMappingPorts: String = mappingPorts
+
+  def getMappingHost: String = mappingHost
+
   protected def getProcess(): Process = this.process
 
   /**
@@ -166,6 +183,20 @@ trait ProcessEngineConnLaunch extends EngineConnLaunch 
with Logging {
       
.findAvailPortByRange(GovernanceCommonConf.ENGINE_CONN_PORT_RANGE.getValue)
       .toString
 
+    val engineType = LabelUtil.getEngineType(request.labels)
+    var engineMappingPortSize = getEngineMappingPortSize(engineType)
+    if (ENGINE_CONN_CONTAINERIZATION_ENABLE && engineMappingPortSize > 0) {
+      val strategyName = ENGINE_CONN_CONTAINERIZATION_MAPPING_STRATEGY.getValue
+      val mappingPortStrategy =
+        
MappingPortContext.getInstance(MappingPortStrategyName.toEnum(strategyName))
+
+      while (engineMappingPortSize > 0) {
+        mappingPorts += mappingPortStrategy.availablePort() + ","
+        engineMappingPortSize = engineMappingPortSize - 1
+      }
+      mappingHost = ENGINE_CONN_CONTAINERIZATION_MAPPING_HOST.getValue
+    }
+
     var springConf =
       Map[String, String]("server.port" -> engineConnPort, 
"spring.profiles.active" -> "engineconn")
     val properties =
@@ -188,10 +219,24 @@ trait ProcessEngineConnLaunch extends EngineConnLaunch 
with Logging {
     engineConnConf = engineConnConf ++: request.creationDesc.properties.asScala
       .filterNot(_._1.startsWith("spring."))
       .toMap
+
+    engineConnConf += (ENGINE_CONN_CONTAINERIZATION_MAPPING_PORTS.key -> 
mappingPorts)
+    engineConnConf += (ENGINE_CONN_CONTAINERIZATION_MAPPING_HOST.key -> 
mappingHost)
+
     arguments.addEngineConnConf(engineConnConf)
     
EngineConnArgumentsParser.getEngineConnArgumentsParser.parseToArgs(arguments.build())
   }
 
+  def getEngineMappingPortSize(engineType: String): Int = {
+    val engineList = ENGINE_CONN_CONTAINERIZATION_ENGINE_LIST.getValue
+    val infoList = engineList.trim
+      .split(",")
+      .map(_.split("-"))
+      .filter(engine => engine(0).equals(engineType))
+    if (infoList.length > 0) infoList(0)(1).toInt
+    else 0
+  }
+
   override def kill(): Unit = {
     if (process != null) {
       process.destroy()
diff --git 
a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/AbstractEngineConnLaunchService.scala
 
b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/AbstractEngineConnLaunchService.scala
index df00ed4960..771ceede8f 100644
--- 
a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/AbstractEngineConnLaunchService.scala
+++ 
b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/AbstractEngineConnLaunchService.scala
@@ -92,7 +92,9 @@ abstract class AbstractEngineConnLaunchService extends 
EngineConnLaunchService w
         case pro: ProcessEngineConnLaunch =>
           val serviceInstance = ServiceInstance(
             GovernanceCommonConf.ENGINE_CONN_SPRING_NAME.getValue,
-            ECMUtils.getInstanceByPort(pro.getEngineConnPort)
+            ECMUtils.getInstanceByPort(pro.getEngineConnPort),
+            pro.getMappingPorts,
+            pro.getMappingHost
           )
           conn.setServiceInstance(serviceInstance)
         case _ =>
diff --git 
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/vo/AMEngineNodeVo.java
 
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/vo/AMEngineNodeVo.java
index d7208f211a..c88f696973 100644
--- 
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/vo/AMEngineNodeVo.java
+++ 
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/vo/AMEngineNodeVo.java
@@ -80,6 +80,10 @@ public class AMEngineNodeVo {
 
   private String engineType;
 
+  private String mappingPorts;
+
+  private String mappingHost;
+
   public String getEmInstance() {
     return emInstance;
   }
@@ -288,4 +292,20 @@ public class AMEngineNodeVo {
   public void setEngineType(String engineType) {
     this.engineType = engineType;
   }
+
+  public String getMappingPorts() {
+    return mappingPorts;
+  }
+
+  public void setMappingPorts(String mappingPorts) {
+    this.mappingPorts = mappingPorts;
+  }
+
+  public String getMappingHost() {
+    return mappingHost;
+  }
+
+  public void setMappingHost(String mappingHost) {
+    this.mappingHost = mappingHost;
+  }
 }
diff --git 
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/utils/AMUtils.scala
 
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/utils/AMUtils.scala
index 428e5d23e9..a2f4ad97ae 100644
--- 
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/utils/AMUtils.scala
+++ 
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/utils/AMUtils.scala
@@ -167,6 +167,8 @@ object AMUtils {
         AMEngineNodeVo.setLabels(node.getLabels)
         
AMEngineNodeVo.setApplicationName(node.getServiceInstance.getApplicationName)
         AMEngineNodeVo.setInstance(node.getServiceInstance.getInstance)
+        AMEngineNodeVo.setMappingHost(node.getServiceInstance.getMappingHost)
+        AMEngineNodeVo.setMappingPorts(node.getServiceInstance.getMappingPorts)
         if (null != node.getEMNode) {
           
AMEngineNodeVo.setEmInstance(node.getEMNode.getServiceInstance.getInstance)
         }
diff --git 
a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/persistence/PersistenceNode.java
 
b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/persistence/PersistenceNode.java
index 770a2e528a..026f5c6453 100644
--- 
a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/persistence/PersistenceNode.java
+++ 
b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/persistence/PersistenceNode.java
@@ -36,6 +36,9 @@ public class PersistenceNode {
   private String updator;
   private String creator;
 
+  private String mappingPorts;
+  private String mappingHost;
+
   public String getMark() {
     return mark;
   }
@@ -123,4 +126,20 @@ public class PersistenceNode {
   public void setCreator(String creator) {
     this.creator = creator;
   }
+
+  public String getMappingPorts() {
+    return mappingPorts;
+  }
+
+  public void setMappingPorts(String mappingPorts) {
+    this.mappingPorts = mappingPorts;
+  }
+
+  public String getMappingHost() {
+    return mappingHost;
+  }
+
+  public void setMappingHost(String mappingHost) {
+    this.mappingHost = mappingHost;
+  }
 }
diff --git 
a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/impl/DefaultNodeManagerPersistence.java
 
b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/impl/DefaultNodeManagerPersistence.java
index 31a8742d9f..40f479f496 100644
--- 
a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/impl/DefaultNodeManagerPersistence.java
+++ 
b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/impl/DefaultNodeManagerPersistence.java
@@ -109,6 +109,10 @@ public class DefaultNodeManagerPersistence implements 
NodeManagerPersistence {
         node.getOwner()); // The creator is not given when inserting records 
in rm, so you need to
     // set this value(rm中插入记录的时候并未给出creator,所以需要set这个值)
     persistenceNode.setUpdator(node.getOwner());
+    String mappingPorts = node.getServiceInstance().getMappingPorts();
+    persistenceNode.setMappingPorts(mappingPorts);
+    String mappingHost = node.getServiceInstance().getMappingHost();
+    persistenceNode.setMappingHost(mappingHost);
     try {
       nodeManagerMapper.updateNodeInstance(serviceInstance.getInstance(), 
persistenceNode);
       nodeManagerMapper.updateNodeRelation(
@@ -277,6 +281,8 @@ public class DefaultNodeManagerPersistence implements 
NodeManagerPersistence {
       ServiceInstance emServiceInstance = new ServiceInstance();
       emServiceInstance.setApplicationName(emName);
       emServiceInstance.setInstance(emInstance);
+      
emServiceInstance.setMappingPorts(String.valueOf(emNode.getMappingPorts()));
+      
emServiceInstance.setMappingHost(String.valueOf(emNode.getMappingHost()));
       AMEMNode amemNode = new AMEMNode();
       amemNode.setMark(emNode.getMark());
       amemNode.setOwner(emNode.getOwner());
@@ -331,6 +337,8 @@ public class DefaultNodeManagerPersistence implements 
NodeManagerPersistence {
       ServiceInstance engineServiceInstance = new ServiceInstance();
       engineServiceInstance.setInstance(engineNode.getInstance());
       engineServiceInstance.setApplicationName(engineNode.getName());
+      
engineServiceInstance.setMappingPorts(String.valueOf(engineNode.getMappingPorts()));
+      
engineServiceInstance.setMappingHost(String.valueOf(engineNode.getMappingHost()));
       amEngineNode.setServiceInstance(engineServiceInstance);
       amEngineNode.setOwner(engineNode.getOwner());
       amEngineNode.setMark(engineNode.getMark());
diff --git 
a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/resources/mapper/common/NodeManagerMapper.xml
 
b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/resources/mapper/common/NodeManagerMapper.xml
index 3d41a7b05b..3d7782c21f 100644
--- 
a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/resources/mapper/common/NodeManagerMapper.xml
+++ 
b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/resources/mapper/common/NodeManagerMapper.xml
@@ -66,6 +66,12 @@
             <if test="persistenceNode.identifier != null">
                 identifier = #{persistenceNode.identifier},
             </if>
+            <if test="persistenceNode.mappingPorts != null">
+                mapping_ports = #{persistenceNode.mappingPorts},
+            </if>
+            <if test="persistenceNode.mappingHost != null">
+                mapping_host = #{persistenceNode.mappingHost},
+            </if>
         </set>
         WHERE instance = #{instance}
     </update>
diff --git a/linkis-dist/helm/charts/linkis/templates/configmap-init-sql.yaml 
b/linkis-dist/helm/charts/linkis/templates/configmap-init-sql.yaml
index df7117efeb..634c089220 100644
--- a/linkis-dist/helm/charts/linkis/templates/configmap-init-sql.yaml
+++ b/linkis-dist/helm/charts/linkis/templates/configmap-init-sql.yaml
@@ -193,6 +193,7 @@ data:
     `engine_type` varchar(32) DEFAULT NULL COMMENT 'Engine type',
     `execution_code` text DEFAULT NULL COMMENT 'Job origin code or code path',
     `result_location` varchar(500) DEFAULT NULL COMMENT 'File path of the 
resultsets',
+    `observe_info` varchar(500) DEFAULT NULL COMMENT 'The notification 
information configuration of this job',
     PRIMARY KEY (`id`),
     KEY `created_time` (`created_time`),
     KEY `submit_user` (`submit_user`)
@@ -723,6 +724,9 @@ data:
     `owner` varchar(32) COLLATE utf8_bin DEFAULT NULL,
     `mark` varchar(32) COLLATE utf8_bin DEFAULT NULL,
     `identifier` varchar(32) COLLATE utf8_bin DEFAULT NULL,
+    `ticketId` varchar(255) COLLATE utf8_bin DEFAULT NULL,
+    `mapping_host` varchar(128) COLLATE utf8_bin DEFAULT NULL,
+    `mapping_ports` varchar(128) COLLATE utf8_bin DEFAULT NULL,
     `update_time` datetime DEFAULT CURRENT_TIMESTAMP,
     `create_time` datetime DEFAULT CURRENT_TIMESTAMP,
     `updator` varchar(32) COLLATE utf8_bin DEFAULT NULL,
diff --git a/linkis-dist/package/db/linkis_ddl.sql 
b/linkis-dist/package/db/linkis_ddl.sql
index 3e90023a4d..9f9d800ceb 100644
--- a/linkis-dist/package/db/linkis_ddl.sql
+++ b/linkis-dist/package/db/linkis_ddl.sql
@@ -751,6 +751,8 @@ CREATE TABLE `linkis_cg_manager_service_instance` (
   `mark` varchar(32) COLLATE utf8_bin DEFAULT NULL,
   `identifier` varchar(32) COLLATE utf8_bin DEFAULT NULL,
   `ticketId` varchar(255) COLLATE utf8_bin DEFAULT NULL,
+  `mapping_host` varchar(128) COLLATE utf8_bin DEFAULT NULL,
+  `mapping_ports` varchar(128) COLLATE utf8_bin DEFAULT NULL,
   `update_time` datetime DEFAULT CURRENT_TIMESTAMP,
   `create_time` datetime DEFAULT CURRENT_TIMESTAMP,
   `updator` varchar(32) COLLATE utf8_bin DEFAULT NULL,
diff --git a/linkis-dist/package/db/linkis_ddl_pg.sql 
b/linkis-dist/package/db/linkis_ddl_pg.sql
index 7dceba602a..5ce8fce319 100644
--- a/linkis-dist/package/db/linkis_ddl_pg.sql
+++ b/linkis-dist/package/db/linkis_ddl_pg.sql
@@ -804,6 +804,8 @@ CREATE TABLE linkis_cg_manager_service_instance (
        mark varchar(32) NULL,
        identifier varchar(32) NULL,
        ticketId varchar(255) NULL DEFAULT NULL,
+    mapping_host varchar(128) NULL DEFAULT NULL,
+    mapping_ports varchar(128) NULL DEFAULT NULL,
        update_time timestamp(6) NULL DEFAULT CURRENT_TIMESTAMP,
        create_time timestamp(6) NULL DEFAULT CURRENT_TIMESTAMP,
        updator varchar(32) NULL,
diff --git a/linkis-dist/package/db/upgrade/1.7.0_schema/mysql/linkis_ddl.sql 
b/linkis-dist/package/db/upgrade/1.7.0_schema/mysql/linkis_ddl.sql
index 4ac4c8cc5c..f58e190077 100644
--- a/linkis-dist/package/db/upgrade/1.7.0_schema/mysql/linkis_ddl.sql
+++ b/linkis-dist/package/db/upgrade/1.7.0_schema/mysql/linkis_ddl.sql
@@ -95,6 +95,10 @@ CREATE TABLE `linkis_ps_python_module_info` (
 ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
 
 
+ALTER TABLE `linkis_cg_manager_service_instance` ADD COLUMN mapping_ports 
varchar(128);
+ALTER TABLE `linkis_cg_manager_service_instance` ADD COLUMN mapping_host 
varchar(128);
+
+
 
 
 
diff --git 
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/context/SparkConfig.java
 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/context/SparkConfig.java
index 1768b77d04..f70e557066 100644
--- 
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/context/SparkConfig.java
+++ 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/context/SparkConfig.java
@@ -309,6 +309,14 @@ public class SparkConfig {
     this.conf = conf;
   }
 
+  public void addAllConf(Map<String, String> conf) {
+    if (this.conf == null) {
+      setConf(conf);
+    } else {
+      this.conf.putAll(conf);
+    }
+  }
+
   public String getPropertiesFile() {
     return propertiesFile;
   }
diff --git 
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/YarnApplicationClusterDescriptorAdapter.java
 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/YarnApplicationClusterDescriptorAdapter.java
index 9c753d862f..d65cb03c5d 100644
--- 
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/YarnApplicationClusterDescriptorAdapter.java
+++ 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/YarnApplicationClusterDescriptorAdapter.java
@@ -69,6 +69,9 @@ public class YarnApplicationClusterDescriptorAdapter extends 
ClusterDescriptorAd
     addSparkArg(sparkLauncher, "--principal", sparkConfig.getPrincipal());
     addSparkArg(sparkLauncher, "--keytab", sparkConfig.getKeytab());
     addSparkArg(sparkLauncher, "--queue", sparkConfig.getQueue());
+    sparkConfig
+        .getConf()
+        .forEach((key, value) -> addSparkArg(sparkLauncher, "--conf", key + 
"=" + value));
     sparkLauncher.setAppResource(sparkConfig.getAppResource());
     sparkLauncher.setMainClass(mainClass);
     Arrays.stream(args.split("\\s+"))
diff --git 
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/hooks/SparkContainerizationEngineConnHook.java
 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/hooks/SparkContainerizationEngineConnHook.java
new file mode 100644
index 0000000000..71a0776de3
--- /dev/null
+++ 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/hooks/SparkContainerizationEngineConnHook.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.hooks;
+
+import org.apache.linkis.engineconn.common.creation.EngineCreationContext;
+import org.apache.linkis.engineconn.common.engineconn.EngineConn;
+import org.apache.linkis.engineconn.common.hook.EngineConnHook;
+import org.apache.linkis.engineplugin.spark.config.SparkConfiguration;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SparkContainerizationEngineConnHook implements EngineConnHook {
+  private static final Logger logger =
+      LoggerFactory.getLogger(SparkContainerizationEngineConnHook.class);
+
+  @Override
+  public void beforeCreateEngineConn(EngineCreationContext 
engineCreationContext) {
+    Map<String, String> options = engineCreationContext.getOptions();
+    String mappingHost =
+        
SparkConfiguration.ENGINE_CONN_CONTAINERIZATION_MAPPING_HOST().getValue(options);
+    String mappingPorts =
+        
SparkConfiguration.ENGINE_CONN_CONTAINERIZATION_MAPPING_PORTS().getValue(options);
+    List<String> mappingPortList =
+        Arrays.stream(mappingPorts.trim().split(","))
+            .filter(StringUtils::isNoneEmpty)
+            .collect(Collectors.toList());
+
+    if (mappingPortList.size() == 2) {
+      logger.info(
+          "加载spark容器化配置, spark.driver.host={}, spark.driver.port={}, 
spark.driver.blockManager.port={}",
+          mappingHost,
+          mappingPortList.get(0),
+          mappingPortList.get(1));
+      options.put(SparkConfiguration.SPARK_DRIVER_HOST().key(), mappingHost);
+      options.put(
+          SparkConfiguration.SPARK_DRIVER_BIND_ADDRESS().key(),
+          SparkConfiguration.SPARK_DRIVER_BIND_ADDRESS().defaultValue());
+      options.put(SparkConfiguration.SPARK_DRIVER_PORT().key(), 
mappingPortList.get(0));
+      options.put(
+          SparkConfiguration.SPARK_DRIVER_BLOCK_MANAGER_PORT().key(), 
mappingPortList.get(1));
+    }
+  }
+
+  @Override
+  public void beforeExecutionExecute(
+      EngineCreationContext engineCreationContext, EngineConn engineConn) {}
+
+  @Override
+  public void afterExecutionExecute(
+      EngineCreationContext engineCreationContext, EngineConn engineConn) {}
+
+  @Override
+  public void afterEngineServerStartFailed(
+      EngineCreationContext engineCreationContext, Throwable throwable) {}
+
+  @Override
+  public void afterEngineServerStartSuccess(
+      EngineCreationContext engineCreationContext, EngineConn engineConn) {}
+}
diff --git 
a/linkis-engineconn-plugins/spark/src/main/resources/linkis-engineconn.properties
 
b/linkis-engineconn-plugins/spark/src/main/resources/linkis-engineconn.properties
index a535e31ea0..c05631eac5 100644
--- 
a/linkis-engineconn-plugins/spark/src/main/resources/linkis-engineconn.properties
+++ 
b/linkis-engineconn-plugins/spark/src/main/resources/linkis-engineconn.properties
@@ -24,7 +24,7 @@ wds.linkis.engineconn.debug.enable=true
 
 
wds.linkis.engineconn.plugin.default.class=org.apache.linkis.engineplugin.spark.SparkEngineConnPlugin
 
-wds.linkis.engine.connector.hooks=org.apache.linkis.engineconn.computation.executor.hook.ComputationEngineConnHook,org.apache.linkis.engineconn.computation.executor.hook.PyUdfEngineHook,org.apache.linkis.engineconn.computation.executor.hook.ScalaUdfEngineHook,org.apache.linkis.engineconn.computation.executor.hook.JarUdfEngineHook,org.apache.linkis.engineconn.computation.executor.hook.SparkInitSQLHook,org.apache.linkis.engineconn.computation.executor.hook.PythonSparkEngineHook
+wds.linkis.engine.connector.hooks=org.apache.linkis.engineconn.computation.executor.hook.ComputationEngineConnHook,org.apache.linkis.engineconn.computation.executor.hook.PyUdfEngineHook,org.apache.linkis.engineconn.computation.executor.hook.ScalaUdfEngineHook,org.apache.linkis.engineconn.computation.executor.hook.JarUdfEngineHook,org.apache.linkis.engineconn.computation.executor.hook.SparkInitSQLHook,org.apache.linkis.engineconn.computation.executor.hook.PythonSparkEngineHook,org.apache.
 [...]
 
 linkis.spark.once.yarn.restful.url=http://127.0.0.1:8088
 
diff --git 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala
 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala
index 9fea6ec70d..9aba046654 100644
--- 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala
+++ 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala
@@ -211,6 +211,20 @@ object SparkConfiguration extends Logging {
 
   val SPARKMEASURE_FLIGHT_RECORDER_OUTPUT_FILENAME_KEY = 
"spark.sparkmeasure.outputFilename"
 
+  val ENGINE_CONN_CONTAINERIZATION_MAPPING_HOST =
+    CommonVars("linkis.engine.containerization.mapping.host", "")
+
+  val ENGINE_CONN_CONTAINERIZATION_MAPPING_PORTS =
+    CommonVars("linkis.engine.containerization.mapping.ports", "")
+
+  val SPARK_DRIVER_HOST = CommonVars[String]("spark.driver.host", "")
+
+  val SPARK_DRIVER_PORT = CommonVars[String]("spark.driver.port", "")
+
+  val SPARK_DRIVER_BIND_ADDRESS = 
CommonVars[String]("spark.driver.bindAddress", "0.0.0.0")
+
+  val SPARK_DRIVER_BLOCK_MANAGER_PORT = 
CommonVars[String]("spark.driver.blockManager.port", "")
+
   private def getMainJarName(): String = {
     val somePath = ClassUtils.jarOfClass(classOf[SparkEngineConnFactory])
     if (somePath.isDefined) {
diff --git 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
index ad9786dff0..9e71c76531 100644
--- 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
+++ 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
@@ -137,6 +137,18 @@ class SparkEngineConnFactory extends 
MultiExecutorEngineConnFactory with Logging
     sparkConfig.setQueue(LINKIS_QUEUE_NAME.getValue(options))
     sparkConfig.setPyFiles(SPARK_PYTHON_FILES.getValue(options))
 
+    val conf = new util.HashMap[String, String]()
+    addSparkConf(conf, SPARK_DRIVER_HOST.key, 
SPARK_DRIVER_HOST.getValue(options))
+    addSparkConf(conf, SPARK_DRIVER_PORT.key, 
SPARK_DRIVER_PORT.getValue(options))
+    addSparkConf(conf, SPARK_DRIVER_BIND_ADDRESS.key, 
SPARK_DRIVER_BIND_ADDRESS.getValue(options))
+    addSparkConf(
+      conf,
+      SPARK_DRIVER_BLOCK_MANAGER_PORT.key,
+      SPARK_DRIVER_BLOCK_MANAGER_PORT.getValue(options)
+    )
+
+    sparkConfig.addAllConf(conf)
+
     logger.info(s"spark_info: ${sparkConfig}")
     sparkConfig
   }
@@ -185,6 +197,19 @@ class SparkEngineConnFactory extends 
MultiExecutorEngineConnFactory with Logging
     // when spark version is greater than or equal to 1.5.0
     if (master.contains("yarn")) sparkConf.set("spark.yarn.isPython", "true")
 
+    addSparkConf(sparkConf, SPARK_DRIVER_HOST.key, 
SPARK_DRIVER_HOST.getValue(options))
+    addSparkConf(sparkConf, SPARK_DRIVER_PORT.key, 
SPARK_DRIVER_PORT.getValue(options))
+    addSparkConf(
+      sparkConf,
+      SPARK_DRIVER_BIND_ADDRESS.key,
+      SPARK_DRIVER_BIND_ADDRESS.getValue(options)
+    )
+    addSparkConf(
+      sparkConf,
+      SPARK_DRIVER_BLOCK_MANAGER_PORT.key,
+      SPARK_DRIVER_BLOCK_MANAGER_PORT.getValue(options)
+    )
+
     val outputDir = createOutputDir(sparkConf)
 
     logger.info(
@@ -215,6 +240,18 @@ class SparkEngineConnFactory extends 
MultiExecutorEngineConnFactory with Logging
     SparkEngineSession(sc, sqlContext, sparkSession, outputDir)
   }
 
+  private def addSparkConf(conf: SparkConf, key: String, value: String): Unit 
= {
+    if (StringUtils.isNotEmpty(value)) {
+      conf.set(key, value)
+    }
+  }
+
+  private def addSparkConf(conf: JMap[String, String], key: String, value: 
String): Unit = {
+    if (StringUtils.isNotEmpty(value)) {
+      conf.put(key, value)
+    }
+  }
+
   def createSparkSession(
       outputDir: File,
       conf: SparkConf,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to