This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/linkis.git
The following commit(s) were added to refs/heads/master by this push:
new bca9fe22ff Add a containerized mode to the ECM service (#5201)
bca9fe22ff is described below
commit bca9fe22ffa3f5bfaedfd18ed631ecf6d23e6c30
Author: LiuGuoHua <[email protected]>
AuthorDate: Wed Nov 27 16:26:34 2024 +0800
Add a containerized mode to the ECM service (#5201)
* Add a containerized mode to the ECM service, which allows assigning
specific IPs and ports for communication with the outside world to particular
engines in this mode. For instance, a Spark engine requires at least two ports:
spark.driver.port and spark.driver.blockManager.port.
* Supplement the SQL for database modification
* remove code that may cause compilation to fail
* improve the linkis_ddl.sql in configmap-init-sql.yaml
* Add all parameters from SparkConf.getConf to sparkLauncher.
* Add a method addAllConf to SparkConf.
* formatted code
* Fix the missing ticketId field in the linkis_cg_manager_service_instance
within the configmap-init-sql.yaml file.
* Fix the missing observe_info field in the
linkis_ps_job_history_group_history within the configmap-init-sql.yaml file.
* print lm log
* add debug log
* Add fields(mapping_host, mapping_ports) to the
linkis_cg_manager_service_instance for ddl file.
* remove debug action
---------
Co-authored-by: peacewong <[email protected]>
---
.github/workflows/integration-test.yml | 5 +-
.../org/apache/linkis/common/ServiceInstance.scala | 20 ++++++
.../enums/MappingPortStrategyName.java | 39 ++++++++++
.../strategy/MappingPortContext.java | 32 +++++++++
.../strategy/MappingPortStrategy.java | 24 +++++++
.../strategy/StaticMappingPortStrategy.java | 79 +++++++++++++++++++++
.../ecm/core/conf/ContainerizationConf.scala | 44 ++++++++++++
.../ecm/core/launch/ProcessEngineConnLaunch.scala | 45 ++++++++++++
.../impl/AbstractEngineConnLaunchService.scala | 4 +-
.../linkis/manager/am/vo/AMEngineNodeVo.java | 20 ++++++
.../apache/linkis/manager/am/utils/AMUtils.scala | 2 +
.../common/entity/persistence/PersistenceNode.java | 19 +++++
.../impl/DefaultNodeManagerPersistence.java | 8 +++
.../resources/mapper/common/NodeManagerMapper.xml | 6 ++
.../linkis/templates/configmap-init-sql.yaml | 4 ++
linkis-dist/package/db/linkis_ddl.sql | 2 +
linkis-dist/package/db/linkis_ddl_pg.sql | 2 +
.../db/upgrade/1.7.0_schema/mysql/linkis_ddl.sql | 4 ++
.../spark/client/context/SparkConfig.java | 8 +++
.../YarnApplicationClusterDescriptorAdapter.java | 3 +
.../hooks/SparkContainerizationEngineConnHook.java | 82 ++++++++++++++++++++++
.../main/resources/linkis-engineconn.properties | 2 +-
.../spark/config/SparkConfiguration.scala | 14 ++++
.../spark/factory/SparkEngineConnFactory.scala | 37 ++++++++++
24 files changed, 500 insertions(+), 5 deletions(-)
diff --git a/.github/workflows/integration-test.yml
b/.github/workflows/integration-test.yml
index 85d3b9a23e..6e4e5097a2 100644
--- a/.github/workflows/integration-test.yml
+++ b/.github/workflows/integration-test.yml
@@ -166,7 +166,7 @@ jobs:
# Show port-forward list
bash ./linkis-dist/helm/scripts/remote-proxy.sh list
# Check if the web service is available
- curl http://127.0.0.1:8088/indexhtml
+ curl http://127.0.0.1:8088/
# Execute test by linkis-cli
POD_NAME=`kubectl get pods -n linkis -l
app.kubernetes.io/instance=linkis-demo-mg-gateway -o
jsonpath='{.items[0].metadata.name}'`
@@ -182,5 +182,4 @@ jobs:
#kubectl exec -it -n linkis ${POD_NAME} -- bash -c " \
#sh /opt/linkis/bin/linkis-cli -engineType spark-3.2.1 -codeType sql
-code 'show databases' "
-
- shell: bash
\ No newline at end of file
+ shell: bash
diff --git
a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/ServiceInstance.scala
b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/ServiceInstance.scala
index f9e4718472..08974df281 100644
---
a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/ServiceInstance.scala
+++
b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/ServiceInstance.scala
@@ -21,11 +21,19 @@ class ServiceInstance {
private var applicationName: String = _
private var instance: String = _
private var registryTimestamp: Long = _
+ private var mappingPorts: String = _
+ private var mappingHost: String = _
def setApplicationName(applicationName: String): Unit = this.applicationName
= applicationName
def getApplicationName: String = applicationName
def setInstance(instance: String): Unit = this.instance = instance
def getInstance: String = instance
+ def setMappingPorts(mappingPorts: String): Unit = this.mappingPorts =
mappingPorts
+ def getMappingPorts: String = mappingPorts
+
+ def setMappingHost(mappingHost: String): Unit = this.mappingHost =
mappingHost
+ def getMappingHost: String = mappingHost
+
def setRegistryTimestamp(registryTimestamp: Long): Unit =
this.registryTimestamp =
registryTimestamp
@@ -62,6 +70,18 @@ object ServiceInstance {
serviceInstance
}
+ def apply(
+ applicationName: String,
+ instance: String,
+ mappingPorts: String,
+ mappingHost: String
+ ): ServiceInstance = {
+ val serviceInstance = apply(applicationName, instance)
+ serviceInstance.setMappingPorts(mappingPorts)
+ serviceInstance.setMappingHost(mappingHost)
+ serviceInstance
+ }
+
def apply(applicationName: String, instance: String, registryTimestamp:
Long): ServiceInstance = {
val serviceInstance = new ServiceInstance
serviceInstance.setApplicationName(applicationName)
diff --git
a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-core/src/main/java/org/apache/linkis/ecm/core/containerization/enums/MappingPortStrategyName.java
b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-core/src/main/java/org/apache/linkis/ecm/core/containerization/enums/MappingPortStrategyName.java
new file mode 100644
index 0000000000..7d43fc9ae0
--- /dev/null
+++
b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-core/src/main/java/org/apache/linkis/ecm/core/containerization/enums/MappingPortStrategyName.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.ecm.core.containerization.enums;
+
+public enum MappingPortStrategyName {
+ STATIC("static");
+ private String name;
+
+ MappingPortStrategyName(String name) {
+ this.name = name;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public static MappingPortStrategyName toEnum(String name) {
+ return MappingPortStrategyName.valueOf(name.toUpperCase());
+ }
+}
diff --git
a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-core/src/main/java/org/apache/linkis/ecm/core/containerization/strategy/MappingPortContext.java
b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-core/src/main/java/org/apache/linkis/ecm/core/containerization/strategy/MappingPortContext.java
new file mode 100644
index 0000000000..ef31b2562d
--- /dev/null
+++
b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-core/src/main/java/org/apache/linkis/ecm/core/containerization/strategy/MappingPortContext.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.ecm.core.containerization.strategy;
+
+import
org.apache.linkis.ecm.core.containerization.enums.MappingPortStrategyName;
+
+public class MappingPortContext {
+
+ public static MappingPortStrategy getInstance(MappingPortStrategyName
strategyName) {
+ switch (strategyName) {
+ case STATIC:
+ return new StaticMappingPortStrategy();
+ default:
+ return new StaticMappingPortStrategy();
+ }
+ }
+}
diff --git
a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-core/src/main/java/org/apache/linkis/ecm/core/containerization/strategy/MappingPortStrategy.java
b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-core/src/main/java/org/apache/linkis/ecm/core/containerization/strategy/MappingPortStrategy.java
new file mode 100644
index 0000000000..21cf2ac3a0
--- /dev/null
+++
b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-core/src/main/java/org/apache/linkis/ecm/core/containerization/strategy/MappingPortStrategy.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.ecm.core.containerization.strategy;
+
+import java.io.IOException;
+
+public interface MappingPortStrategy {
+ int availablePort() throws IOException;
+}
diff --git
a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-core/src/main/java/org/apache/linkis/ecm/core/containerization/strategy/StaticMappingPortStrategy.java
b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-core/src/main/java/org/apache/linkis/ecm/core/containerization/strategy/StaticMappingPortStrategy.java
new file mode 100644
index 0000000000..6ea627093f
--- /dev/null
+++
b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-core/src/main/java/org/apache/linkis/ecm/core/containerization/strategy/StaticMappingPortStrategy.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.ecm.core.containerization.strategy;
+
+import org.apache.linkis.ecm.core.conf.ContainerizationConf;
+
+import org.apache.commons.io.IOUtils;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class StaticMappingPortStrategy implements MappingPortStrategy {
+
+ private static final AtomicInteger currentIndex = new AtomicInteger(0);
+
+ @Override
+ public int availablePort() throws IOException {
+ return getNewPort(10);
+ }
+
+ public int getNewPort(int retryNum) throws IOException {
+ int[] portRange = getPortRange();
+ if (retryNum == 0) {
+ throw new IOException(
+ "No available port in the portRange: "
+ +
ContainerizationConf.ENGINE_CONN_CONTAINERIZATION_MAPPING_STATTIC_PORT_RANGE()
+ .getValue());
+ }
+ moveIndex();
+ int minPort = portRange[0];
+ int newPort = minPort + currentIndex.get() - 1;
+ ServerSocket socket = null;
+ try {
+ socket = new ServerSocket(newPort);
+ } catch (Exception e) {
+ return getNewPort(--retryNum);
+ } finally {
+ IOUtils.close(socket);
+ }
+ return newPort;
+ }
+
+ private synchronized void moveIndex() {
+ int poolSize = getPoolSize();
+ currentIndex.set(currentIndex.get() % poolSize + 1);
+ }
+
+ private int[] getPortRange() {
+ String portRange =
+
ContainerizationConf.ENGINE_CONN_CONTAINERIZATION_MAPPING_STATTIC_PORT_RANGE().getValue();
+
+ return
Arrays.stream(portRange.split("-")).mapToInt(Integer::parseInt).toArray();
+ }
+
+ private int getPoolSize() {
+ int[] portRange = getPortRange();
+ int minPort = portRange[0];
+ int maxPort = portRange[1];
+
+ return maxPort - minPort + 1;
+ }
+}
diff --git
a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-core/src/main/scala/org/apache/linkis/ecm/core/conf/ContainerizationConf.scala
b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-core/src/main/scala/org/apache/linkis/ecm/core/conf/ContainerizationConf.scala
new file mode 100644
index 0000000000..a7461210f1
--- /dev/null
+++
b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-core/src/main/scala/org/apache/linkis/ecm/core/conf/ContainerizationConf.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.ecm.core.conf
+
+import org.apache.linkis.common.conf.CommonVars
+
+object ContainerizationConf {
+
+ val ENGINE_CONN_CONTAINERIZATION_MAPPING_STATTIC_PORT_RANGE =
+ CommonVars("linkis.engine.containerization.static.port.range", "1-65535")
+
+ val ENGINE_CONN_CONTAINERIZATION_ENABLE =
+ CommonVars("linkis.engine.containerization.enable", false).getValue
+
+ val ENGINE_CONN_CONTAINERIZATION_MAPPING_HOST =
+ CommonVars("linkis.engine.containerization.mapping.host", "")
+
+ val ENGINE_CONN_CONTAINERIZATION_MAPPING_PORTS =
+ CommonVars("linkis.engine.containerization.mapping.ports", "")
+
+ val ENGINE_CONN_CONTAINERIZATION_MAPPING_STRATEGY =
+ CommonVars("linkis.engine.containerization.mapping.strategy", "static")
+
+ // 引擎类型-需要开启的端口数量
+ // Engine Type - Number of Ports Required to Be Opened
+ val ENGINE_CONN_CONTAINERIZATION_ENGINE_LIST =
+ CommonVars("linkis.engine.containerization.engine.list", "spark-2,")
+
+}
diff --git
a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-core/src/main/scala/org/apache/linkis/ecm/core/launch/ProcessEngineConnLaunch.scala
b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-core/src/main/scala/org/apache/linkis/ecm/core/launch/ProcessEngineConnLaunch.scala
index cc79e24d4f..4dfb96b3d5 100644
---
a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-core/src/main/scala/org/apache/linkis/ecm/core/launch/ProcessEngineConnLaunch.scala
+++
b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-core/src/main/scala/org/apache/linkis/ecm/core/launch/ProcessEngineConnLaunch.scala
@@ -20,6 +20,15 @@ package org.apache.linkis.ecm.core.launch
import org.apache.linkis.common.conf.{CommonVars, Configuration}
import org.apache.linkis.common.exception.ErrorException
import org.apache.linkis.common.utils.{Logging, Utils}
+import org.apache.linkis.ecm.core.conf.ContainerizationConf.{
+ ENGINE_CONN_CONTAINERIZATION_ENABLE,
+ ENGINE_CONN_CONTAINERIZATION_ENGINE_LIST,
+ ENGINE_CONN_CONTAINERIZATION_MAPPING_HOST,
+ ENGINE_CONN_CONTAINERIZATION_MAPPING_PORTS,
+ ENGINE_CONN_CONTAINERIZATION_MAPPING_STRATEGY
+}
+import
org.apache.linkis.ecm.core.containerization.enums.MappingPortStrategyName
+import org.apache.linkis.ecm.core.containerization.strategy.MappingPortContext
import org.apache.linkis.ecm.core.errorcode.LinkisECMErrorCodeSummary._
import org.apache.linkis.ecm.core.exception.ECMCoreException
import org.apache.linkis.ecm.core.utils.PortUtils
@@ -35,6 +44,7 @@ import
org.apache.linkis.manager.engineplugin.common.launch.process.{
}
import
org.apache.linkis.manager.engineplugin.common.launch.process.Environment._
import
org.apache.linkis.manager.engineplugin.common.launch.process.LaunchConstants._
+import org.apache.linkis.manager.label.utils.LabelUtil
import org.apache.commons.io.FileUtils
import org.apache.commons.lang3.StringUtils
@@ -54,6 +64,9 @@ trait ProcessEngineConnLaunch extends EngineConnLaunch with
Logging {
private var engineConnPort: String = _
+ private var mappingPorts: String = ""
+ private var mappingHost: String = _
+
protected def newProcessEngineConnCommandBuilder():
ProcessEngineCommandBuilder =
new UnixProcessEngineCommandBuilder
@@ -142,6 +155,10 @@ trait ProcessEngineConnLaunch extends EngineConnLaunch
with Logging {
def getEngineConnPort: String = engineConnPort
+ def getMappingPorts: String = mappingPorts
+
+ def getMappingHost: String = mappingHost
+
protected def getProcess(): Process = this.process
/**
@@ -166,6 +183,20 @@ trait ProcessEngineConnLaunch extends EngineConnLaunch
with Logging {
.findAvailPortByRange(GovernanceCommonConf.ENGINE_CONN_PORT_RANGE.getValue)
.toString
+ val engineType = LabelUtil.getEngineType(request.labels)
+ var engineMappingPortSize = getEngineMappingPortSize(engineType)
+ if (ENGINE_CONN_CONTAINERIZATION_ENABLE && engineMappingPortSize > 0) {
+ val strategyName = ENGINE_CONN_CONTAINERIZATION_MAPPING_STRATEGY.getValue
+ val mappingPortStrategy =
+
MappingPortContext.getInstance(MappingPortStrategyName.toEnum(strategyName))
+
+ while (engineMappingPortSize > 0) {
+ mappingPorts += mappingPortStrategy.availablePort() + ","
+ engineMappingPortSize = engineMappingPortSize - 1
+ }
+ mappingHost = ENGINE_CONN_CONTAINERIZATION_MAPPING_HOST.getValue
+ }
+
var springConf =
Map[String, String]("server.port" -> engineConnPort,
"spring.profiles.active" -> "engineconn")
val properties =
@@ -188,10 +219,24 @@ trait ProcessEngineConnLaunch extends EngineConnLaunch
with Logging {
engineConnConf = engineConnConf ++: request.creationDesc.properties.asScala
.filterNot(_._1.startsWith("spring."))
.toMap
+
+ engineConnConf += (ENGINE_CONN_CONTAINERIZATION_MAPPING_PORTS.key ->
mappingPorts)
+ engineConnConf += (ENGINE_CONN_CONTAINERIZATION_MAPPING_HOST.key ->
mappingHost)
+
arguments.addEngineConnConf(engineConnConf)
EngineConnArgumentsParser.getEngineConnArgumentsParser.parseToArgs(arguments.build())
}
+ def getEngineMappingPortSize(engineType: String): Int = {
+ val engineList = ENGINE_CONN_CONTAINERIZATION_ENGINE_LIST.getValue
+ val infoList = engineList.trim
+ .split(",")
+ .map(_.split("-"))
+ .filter(engine => engine(0).equals(engineType))
+ if (infoList.length > 0) infoList(0)(1).toInt
+ else 0
+ }
+
override def kill(): Unit = {
if (process != null) {
process.destroy()
diff --git
a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/AbstractEngineConnLaunchService.scala
b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/AbstractEngineConnLaunchService.scala
index df00ed4960..771ceede8f 100644
---
a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/AbstractEngineConnLaunchService.scala
+++
b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/AbstractEngineConnLaunchService.scala
@@ -92,7 +92,9 @@ abstract class AbstractEngineConnLaunchService extends
EngineConnLaunchService w
case pro: ProcessEngineConnLaunch =>
val serviceInstance = ServiceInstance(
GovernanceCommonConf.ENGINE_CONN_SPRING_NAME.getValue,
- ECMUtils.getInstanceByPort(pro.getEngineConnPort)
+ ECMUtils.getInstanceByPort(pro.getEngineConnPort),
+ pro.getMappingPorts,
+ pro.getMappingHost
)
conn.setServiceInstance(serviceInstance)
case _ =>
diff --git
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/vo/AMEngineNodeVo.java
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/vo/AMEngineNodeVo.java
index d7208f211a..c88f696973 100644
---
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/vo/AMEngineNodeVo.java
+++
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/vo/AMEngineNodeVo.java
@@ -80,6 +80,10 @@ public class AMEngineNodeVo {
private String engineType;
+ private String mappingPorts;
+
+ private String mappingHost;
+
public String getEmInstance() {
return emInstance;
}
@@ -288,4 +292,20 @@ public class AMEngineNodeVo {
public void setEngineType(String engineType) {
this.engineType = engineType;
}
+
+ public String getMappingPorts() {
+ return mappingPorts;
+ }
+
+ public void setMappingPorts(String mappingPorts) {
+ this.mappingPorts = mappingPorts;
+ }
+
+ public String getMappingHost() {
+ return mappingHost;
+ }
+
+ public void setMappingHost(String mappingHost) {
+ this.mappingHost = mappingHost;
+ }
}
diff --git
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/utils/AMUtils.scala
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/utils/AMUtils.scala
index 428e5d23e9..a2f4ad97ae 100644
---
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/utils/AMUtils.scala
+++
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/utils/AMUtils.scala
@@ -167,6 +167,8 @@ object AMUtils {
AMEngineNodeVo.setLabels(node.getLabels)
AMEngineNodeVo.setApplicationName(node.getServiceInstance.getApplicationName)
AMEngineNodeVo.setInstance(node.getServiceInstance.getInstance)
+ AMEngineNodeVo.setMappingHost(node.getServiceInstance.getMappingHost)
+ AMEngineNodeVo.setMappingPorts(node.getServiceInstance.getMappingPorts)
if (null != node.getEMNode) {
AMEngineNodeVo.setEmInstance(node.getEMNode.getServiceInstance.getInstance)
}
diff --git
a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/persistence/PersistenceNode.java
b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/persistence/PersistenceNode.java
index 770a2e528a..026f5c6453 100644
---
a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/persistence/PersistenceNode.java
+++
b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/persistence/PersistenceNode.java
@@ -36,6 +36,9 @@ public class PersistenceNode {
private String updator;
private String creator;
+ private String mappingPorts;
+ private String mappingHost;
+
public String getMark() {
return mark;
}
@@ -123,4 +126,20 @@ public class PersistenceNode {
public void setCreator(String creator) {
this.creator = creator;
}
+
+ public String getMappingPorts() {
+ return mappingPorts;
+ }
+
+ public void setMappingPorts(String mappingPorts) {
+ this.mappingPorts = mappingPorts;
+ }
+
+ public String getMappingHost() {
+ return mappingHost;
+ }
+
+ public void setMappingHost(String mappingHost) {
+ this.mappingHost = mappingHost;
+ }
}
diff --git
a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/impl/DefaultNodeManagerPersistence.java
b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/impl/DefaultNodeManagerPersistence.java
index 31a8742d9f..40f479f496 100644
---
a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/impl/DefaultNodeManagerPersistence.java
+++
b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/impl/DefaultNodeManagerPersistence.java
@@ -109,6 +109,10 @@ public class DefaultNodeManagerPersistence implements
NodeManagerPersistence {
node.getOwner()); // The creator is not given when inserting records
in rm, so you need to
// set this value(rm中插入记录的时候并未给出creator,所以需要set这个值)
persistenceNode.setUpdator(node.getOwner());
+ String mappingPorts = node.getServiceInstance().getMappingPorts();
+ persistenceNode.setMappingPorts(mappingPorts);
+ String mappingHost = node.getServiceInstance().getMappingHost();
+ persistenceNode.setMappingHost(mappingHost);
try {
nodeManagerMapper.updateNodeInstance(serviceInstance.getInstance(),
persistenceNode);
nodeManagerMapper.updateNodeRelation(
@@ -277,6 +281,8 @@ public class DefaultNodeManagerPersistence implements
NodeManagerPersistence {
ServiceInstance emServiceInstance = new ServiceInstance();
emServiceInstance.setApplicationName(emName);
emServiceInstance.setInstance(emInstance);
+
emServiceInstance.setMappingPorts(String.valueOf(emNode.getMappingPorts()));
+
emServiceInstance.setMappingHost(String.valueOf(emNode.getMappingHost()));
AMEMNode amemNode = new AMEMNode();
amemNode.setMark(emNode.getMark());
amemNode.setOwner(emNode.getOwner());
@@ -331,6 +337,8 @@ public class DefaultNodeManagerPersistence implements
NodeManagerPersistence {
ServiceInstance engineServiceInstance = new ServiceInstance();
engineServiceInstance.setInstance(engineNode.getInstance());
engineServiceInstance.setApplicationName(engineNode.getName());
+
engineServiceInstance.setMappingPorts(String.valueOf(engineNode.getMappingPorts()));
+
engineServiceInstance.setMappingHost(String.valueOf(engineNode.getMappingHost()));
amEngineNode.setServiceInstance(engineServiceInstance);
amEngineNode.setOwner(engineNode.getOwner());
amEngineNode.setMark(engineNode.getMark());
diff --git
a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/resources/mapper/common/NodeManagerMapper.xml
b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/resources/mapper/common/NodeManagerMapper.xml
index 3d41a7b05b..3d7782c21f 100644
---
a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/resources/mapper/common/NodeManagerMapper.xml
+++
b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/resources/mapper/common/NodeManagerMapper.xml
@@ -66,6 +66,12 @@
<if test="persistenceNode.identifier != null">
identifier = #{persistenceNode.identifier},
</if>
+ <if test="persistenceNode.mappingPorts != null">
+ mapping_ports = #{persistenceNode.mappingPorts},
+ </if>
+ <if test="persistenceNode.mappingHost != null">
+ mapping_host = #{persistenceNode.mappingHost},
+ </if>
</set>
WHERE instance = #{instance}
</update>
diff --git a/linkis-dist/helm/charts/linkis/templates/configmap-init-sql.yaml
b/linkis-dist/helm/charts/linkis/templates/configmap-init-sql.yaml
index df7117efeb..634c089220 100644
--- a/linkis-dist/helm/charts/linkis/templates/configmap-init-sql.yaml
+++ b/linkis-dist/helm/charts/linkis/templates/configmap-init-sql.yaml
@@ -193,6 +193,7 @@ data:
`engine_type` varchar(32) DEFAULT NULL COMMENT 'Engine type',
`execution_code` text DEFAULT NULL COMMENT 'Job origin code or code path',
`result_location` varchar(500) DEFAULT NULL COMMENT 'File path of the
resultsets',
+ `observe_info` varchar(500) DEFAULT NULL COMMENT 'The notification
information configuration of this job',
PRIMARY KEY (`id`),
KEY `created_time` (`created_time`),
KEY `submit_user` (`submit_user`)
@@ -723,6 +724,9 @@ data:
`owner` varchar(32) COLLATE utf8_bin DEFAULT NULL,
`mark` varchar(32) COLLATE utf8_bin DEFAULT NULL,
`identifier` varchar(32) COLLATE utf8_bin DEFAULT NULL,
+ `ticketId` varchar(255) COLLATE utf8_bin DEFAULT NULL,
+ `mapping_host` varchar(128) COLLATE utf8_bin DEFAULT NULL,
+ `mapping_ports` varchar(128) COLLATE utf8_bin DEFAULT NULL,
`update_time` datetime DEFAULT CURRENT_TIMESTAMP,
`create_time` datetime DEFAULT CURRENT_TIMESTAMP,
`updator` varchar(32) COLLATE utf8_bin DEFAULT NULL,
diff --git a/linkis-dist/package/db/linkis_ddl.sql
b/linkis-dist/package/db/linkis_ddl.sql
index 3e90023a4d..9f9d800ceb 100644
--- a/linkis-dist/package/db/linkis_ddl.sql
+++ b/linkis-dist/package/db/linkis_ddl.sql
@@ -751,6 +751,8 @@ CREATE TABLE `linkis_cg_manager_service_instance` (
`mark` varchar(32) COLLATE utf8_bin DEFAULT NULL,
`identifier` varchar(32) COLLATE utf8_bin DEFAULT NULL,
`ticketId` varchar(255) COLLATE utf8_bin DEFAULT NULL,
+ `mapping_host` varchar(128) COLLATE utf8_bin DEFAULT NULL,
+ `mapping_ports` varchar(128) COLLATE utf8_bin DEFAULT NULL,
`update_time` datetime DEFAULT CURRENT_TIMESTAMP,
`create_time` datetime DEFAULT CURRENT_TIMESTAMP,
`updator` varchar(32) COLLATE utf8_bin DEFAULT NULL,
diff --git a/linkis-dist/package/db/linkis_ddl_pg.sql
b/linkis-dist/package/db/linkis_ddl_pg.sql
index 7dceba602a..5ce8fce319 100644
--- a/linkis-dist/package/db/linkis_ddl_pg.sql
+++ b/linkis-dist/package/db/linkis_ddl_pg.sql
@@ -804,6 +804,8 @@ CREATE TABLE linkis_cg_manager_service_instance (
mark varchar(32) NULL,
identifier varchar(32) NULL,
ticketId varchar(255) NULL DEFAULT NULL,
+ mapping_host varchar(128) NULL DEFAULT NULL,
+ mapping_ports varchar(128) NULL DEFAULT NULL,
update_time timestamp(6) NULL DEFAULT CURRENT_TIMESTAMP,
create_time timestamp(6) NULL DEFAULT CURRENT_TIMESTAMP,
updator varchar(32) NULL,
diff --git a/linkis-dist/package/db/upgrade/1.7.0_schema/mysql/linkis_ddl.sql
b/linkis-dist/package/db/upgrade/1.7.0_schema/mysql/linkis_ddl.sql
index 4ac4c8cc5c..f58e190077 100644
--- a/linkis-dist/package/db/upgrade/1.7.0_schema/mysql/linkis_ddl.sql
+++ b/linkis-dist/package/db/upgrade/1.7.0_schema/mysql/linkis_ddl.sql
@@ -95,6 +95,10 @@ CREATE TABLE `linkis_ps_python_module_info` (
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
+ALTER TABLE `linkis_cg_manager_service_instance` ADD COLUMN mapping_ports
varchar(128);
+ALTER TABLE `linkis_cg_manager_service_instance` ADD COLUMN mapping_host
varchar(128);
+
+
diff --git
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/context/SparkConfig.java
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/context/SparkConfig.java
index 1768b77d04..f70e557066 100644
---
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/context/SparkConfig.java
+++
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/context/SparkConfig.java
@@ -309,6 +309,14 @@ public class SparkConfig {
this.conf = conf;
}
+ public void addAllConf(Map<String, String> conf) {
+ if (this.conf == null) {
+ setConf(conf);
+ } else {
+ this.conf.putAll(conf);
+ }
+ }
+
public String getPropertiesFile() {
return propertiesFile;
}
diff --git
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/YarnApplicationClusterDescriptorAdapter.java
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/YarnApplicationClusterDescriptorAdapter.java
index 9c753d862f..d65cb03c5d 100644
---
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/YarnApplicationClusterDescriptorAdapter.java
+++
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/YarnApplicationClusterDescriptorAdapter.java
@@ -69,6 +69,9 @@ public class YarnApplicationClusterDescriptorAdapter extends
ClusterDescriptorAd
addSparkArg(sparkLauncher, "--principal", sparkConfig.getPrincipal());
addSparkArg(sparkLauncher, "--keytab", sparkConfig.getKeytab());
addSparkArg(sparkLauncher, "--queue", sparkConfig.getQueue());
+ sparkConfig
+ .getConf()
+ .forEach((key, value) -> addSparkArg(sparkLauncher, "--conf", key +
"=" + value));
sparkLauncher.setAppResource(sparkConfig.getAppResource());
sparkLauncher.setMainClass(mainClass);
Arrays.stream(args.split("\\s+"))
diff --git
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/hooks/SparkContainerizationEngineConnHook.java
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/hooks/SparkContainerizationEngineConnHook.java
new file mode 100644
index 0000000000..71a0776de3
--- /dev/null
+++
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/hooks/SparkContainerizationEngineConnHook.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.hooks;
+
+import org.apache.linkis.engineconn.common.creation.EngineCreationContext;
+import org.apache.linkis.engineconn.common.engineconn.EngineConn;
+import org.apache.linkis.engineconn.common.hook.EngineConnHook;
+import org.apache.linkis.engineplugin.spark.config.SparkConfiguration;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SparkContainerizationEngineConnHook implements EngineConnHook {
+ private static final Logger logger =
+ LoggerFactory.getLogger(SparkContainerizationEngineConnHook.class);
+
+ @Override
+ public void beforeCreateEngineConn(EngineCreationContext
engineCreationContext) {
+ Map<String, String> options = engineCreationContext.getOptions();
+ String mappingHost =
+
SparkConfiguration.ENGINE_CONN_CONTAINERIZATION_MAPPING_HOST().getValue(options);
+ String mappingPorts =
+
SparkConfiguration.ENGINE_CONN_CONTAINERIZATION_MAPPING_PORTS().getValue(options);
+ List<String> mappingPortList =
+ Arrays.stream(mappingPorts.trim().split(","))
+ .filter(StringUtils::isNoneEmpty)
+ .collect(Collectors.toList());
+
+ if (mappingPortList.size() == 2) {
+ logger.info(
+ "加载spark容器化配置, spark.driver.host={}, spark.driver.port={},
spark.driver.blockManager.port={}",
+ mappingHost,
+ mappingPortList.get(0),
+ mappingPortList.get(1));
+ options.put(SparkConfiguration.SPARK_DRIVER_HOST().key(), mappingHost);
+ options.put(
+ SparkConfiguration.SPARK_DRIVER_BIND_ADDRESS().key(),
+ SparkConfiguration.SPARK_DRIVER_BIND_ADDRESS().defaultValue());
+ options.put(SparkConfiguration.SPARK_DRIVER_PORT().key(),
mappingPortList.get(0));
+ options.put(
+ SparkConfiguration.SPARK_DRIVER_BLOCK_MANAGER_PORT().key(),
mappingPortList.get(1));
+ }
+ }
+
+ @Override
+ public void beforeExecutionExecute(
+ EngineCreationContext engineCreationContext, EngineConn engineConn) {}
+
+ @Override
+ public void afterExecutionExecute(
+ EngineCreationContext engineCreationContext, EngineConn engineConn) {}
+
+ @Override
+ public void afterEngineServerStartFailed(
+ EngineCreationContext engineCreationContext, Throwable throwable) {}
+
+ @Override
+ public void afterEngineServerStartSuccess(
+ EngineCreationContext engineCreationContext, EngineConn engineConn) {}
+}
diff --git
a/linkis-engineconn-plugins/spark/src/main/resources/linkis-engineconn.properties
b/linkis-engineconn-plugins/spark/src/main/resources/linkis-engineconn.properties
index a535e31ea0..c05631eac5 100644
---
a/linkis-engineconn-plugins/spark/src/main/resources/linkis-engineconn.properties
+++
b/linkis-engineconn-plugins/spark/src/main/resources/linkis-engineconn.properties
@@ -24,7 +24,7 @@ wds.linkis.engineconn.debug.enable=true
wds.linkis.engineconn.plugin.default.class=org.apache.linkis.engineplugin.spark.SparkEngineConnPlugin
-wds.linkis.engine.connector.hooks=org.apache.linkis.engineconn.computation.executor.hook.ComputationEngineConnHook,org.apache.linkis.engineconn.computation.executor.hook.PyUdfEngineHook,org.apache.linkis.engineconn.computation.executor.hook.ScalaUdfEngineHook,org.apache.linkis.engineconn.computation.executor.hook.JarUdfEngineHook,org.apache.linkis.engineconn.computation.executor.hook.SparkInitSQLHook,org.apache.linkis.engineconn.computation.executor.hook.PythonSparkEngineHook
+wds.linkis.engine.connector.hooks=org.apache.linkis.engineconn.computation.executor.hook.ComputationEngineConnHook,org.apache.linkis.engineconn.computation.executor.hook.PyUdfEngineHook,org.apache.linkis.engineconn.computation.executor.hook.ScalaUdfEngineHook,org.apache.linkis.engineconn.computation.executor.hook.JarUdfEngineHook,org.apache.linkis.engineconn.computation.executor.hook.SparkInitSQLHook,org.apache.linkis.engineconn.computation.executor.hook.PythonSparkEngineHook,org.apache.
[...]
linkis.spark.once.yarn.restful.url=http://127.0.0.1:8088
diff --git
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala
index 9fea6ec70d..9aba046654 100644
---
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala
+++
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala
@@ -211,6 +211,20 @@ object SparkConfiguration extends Logging {
val SPARKMEASURE_FLIGHT_RECORDER_OUTPUT_FILENAME_KEY =
"spark.sparkmeasure.outputFilename"
+ val ENGINE_CONN_CONTAINERIZATION_MAPPING_HOST =
+ CommonVars("linkis.engine.containerization.mapping.host", "")
+
+ val ENGINE_CONN_CONTAINERIZATION_MAPPING_PORTS =
+ CommonVars("linkis.engine.containerization.mapping.ports", "")
+
+ val SPARK_DRIVER_HOST = CommonVars[String]("spark.driver.host", "")
+
+ val SPARK_DRIVER_PORT = CommonVars[String]("spark.driver.port", "")
+
+ val SPARK_DRIVER_BIND_ADDRESS =
CommonVars[String]("spark.driver.bindAddress", "0.0.0.0")
+
+ val SPARK_DRIVER_BLOCK_MANAGER_PORT =
CommonVars[String]("spark.driver.blockManager.port", "")
+
private def getMainJarName(): String = {
val somePath = ClassUtils.jarOfClass(classOf[SparkEngineConnFactory])
if (somePath.isDefined) {
diff --git
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
index ad9786dff0..9e71c76531 100644
---
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
+++
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
@@ -137,6 +137,18 @@ class SparkEngineConnFactory extends
MultiExecutorEngineConnFactory with Logging
sparkConfig.setQueue(LINKIS_QUEUE_NAME.getValue(options))
sparkConfig.setPyFiles(SPARK_PYTHON_FILES.getValue(options))
+ val conf = new util.HashMap[String, String]()
+ addSparkConf(conf, SPARK_DRIVER_HOST.key,
SPARK_DRIVER_HOST.getValue(options))
+ addSparkConf(conf, SPARK_DRIVER_PORT.key,
SPARK_DRIVER_PORT.getValue(options))
+ addSparkConf(conf, SPARK_DRIVER_BIND_ADDRESS.key,
SPARK_DRIVER_BIND_ADDRESS.getValue(options))
+ addSparkConf(
+ conf,
+ SPARK_DRIVER_BLOCK_MANAGER_PORT.key,
+ SPARK_DRIVER_BLOCK_MANAGER_PORT.getValue(options)
+ )
+
+ sparkConfig.addAllConf(conf)
+
logger.info(s"spark_info: ${sparkConfig}")
sparkConfig
}
@@ -185,6 +197,19 @@ class SparkEngineConnFactory extends
MultiExecutorEngineConnFactory with Logging
// when spark version is greater than or equal to 1.5.0
if (master.contains("yarn")) sparkConf.set("spark.yarn.isPython", "true")
+ addSparkConf(sparkConf, SPARK_DRIVER_HOST.key,
SPARK_DRIVER_HOST.getValue(options))
+ addSparkConf(sparkConf, SPARK_DRIVER_PORT.key,
SPARK_DRIVER_PORT.getValue(options))
+ addSparkConf(
+ sparkConf,
+ SPARK_DRIVER_BIND_ADDRESS.key,
+ SPARK_DRIVER_BIND_ADDRESS.getValue(options)
+ )
+ addSparkConf(
+ sparkConf,
+ SPARK_DRIVER_BLOCK_MANAGER_PORT.key,
+ SPARK_DRIVER_BLOCK_MANAGER_PORT.getValue(options)
+ )
+
val outputDir = createOutputDir(sparkConf)
logger.info(
@@ -215,6 +240,18 @@ class SparkEngineConnFactory extends
MultiExecutorEngineConnFactory with Logging
SparkEngineSession(sc, sqlContext, sparkSession, outputDir)
}
+ private def addSparkConf(conf: SparkConf, key: String, value: String): Unit
= {
+ if (StringUtils.isNotEmpty(value)) {
+ conf.set(key, value)
+ }
+ }
+
+ private def addSparkConf(conf: JMap[String, String], key: String, value:
String): Unit = {
+ if (StringUtils.isNotEmpty(value)) {
+ conf.put(key, value)
+ }
+ }
+
def createSparkSession(
outputDir: File,
conf: SparkConf,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]