This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-1.2-lts in repository https://gitbox.apache.org/repos/asf/doris.git
commit 50f4a7356426fa2bf9d51d48fca9c4bb7c2ab450 Author: liujinhui <[email protected]> AuthorDate: Tue Mar 7 15:46:14 2023 +0800 [Enhancement](spark load)Support for RM HA (#15000) Adding RM HA configuration to the spark load. Spark can accept HA parameters via config, we just need to accept it in the DDL CREATE EXTERNAL RESOURCE spark_resource_sinan_node_manager_ha PROPERTIES ( "type" = "spark", "spark.master" = "yarn", "spark.submit.deployMode" = "cluster", "spark.executor.memory" = "10g", "spark.yarn.queue" = "XXXX", "spark.hadoop.yarn.resourcemanager.address" = "XXXX:8032", "spark.hadoop.yarn.resourcemanager.ha.enabled" = "true", "spark.hadoop.yarn.resourcemanager.ha.rm-ids" = "rm1,rm2", "spark.hadoop.yarn.resourcemanager.hostname.rm1" = "XXXX", "spark.hadoop.yarn.resourcemanager.hostname.rm2" = "XXXX", "spark.hadoop.fs.defaultFS" = "hdfs://XXXX", "spark.hadoop.dfs.nameservices" = "hacluster", "spark.hadoop.dfs.ha.namenodes.hacluster" = "mynamenode1,mynamenode2", "spark.hadoop.dfs.namenode.rpc-address.hacluster.mynamenode1" = "XXX:8020", "spark.hadoop.dfs.namenode.rpc-address.hacluster.mynamenode2" = "XXXX:8020", "spark.hadoop.dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider", "working_dir" = "hdfs://XXXX/doris_prd_data/sinan/spark_load/", "broker" = "broker_personas", "broker.username" = "hdfs", "broker.password" = "", "broker.dfs.nameservices" = "XXX", "broker.dfs.ha.namenodes.XXX" = "mynamenode1, mynamenode2", "broker.dfs.namenode.rpc-address.XXXX.mynamenode1" = "XXXX:8020", "broker.dfs.namenode.rpc-address.XXXX.mynamenode2" = "XXXX:8020", "broker.dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider" ); Co-authored-by: liujh <[email protected]> --- .../org/apache/doris/catalog/SparkResource.java | 35 ++++++++++++++++++---- .../apache/doris/catalog/SparkResourceTest.java | 16 ++++++++++ 2 files changed, 46 insertions(+), 5 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SparkResource.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/SparkResource.java index ac761b5fb6..98ebe54b96 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SparkResource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SparkResource.java @@ -32,6 +32,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.gson.annotations.SerializedName; +import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -80,6 +81,10 @@ public class SparkResource extends Resource { private static final String SPARK_YARN_RESOURCE_MANAGER_ADDRESS = "spark.hadoop.yarn.resourcemanager.address"; private static final String SPARK_FS_DEFAULT_FS = "spark.hadoop.fs.defaultFS"; private static final String YARN_RESOURCE_MANAGER_ADDRESS = "yarn.resourcemanager.address"; + private static final String SPARK_YARN_RESOURCE_MANAGER_HA_ENABLED = "spark.hadoop.yarn.resourcemanager.ha.enabled"; + private static final String SPARK_YARN_RESOURCE_MANAGER_HA_RMIDS = "spark.hadoop.yarn.resourcemanager.ha.rm-ids"; + private static final String YARN_RESOURCE_MANAGER_ADDRESS_FOMART = "spark.hadoop.yarn.resourcemanager.address.%s"; + private static final String YARN_RESOURCE_MANAGER_HOSTNAME_FORMAT = "spark.hadoop.yarn.resourcemanager.hostname.%s"; public enum DeployMode { CLUSTER, @@ -283,11 +288,31 @@ public class SparkResource extends Resource { throw new DdlException("Missing " + SPARK_SUBMIT_DEPLOY_MODE + " in properties"); } // if deploy machines do not set HADOOP_CONF_DIR env, we should set these configs blow - if ((!sparkConfigs.containsKey(SPARK_YARN_RESOURCE_MANAGER_ADDRESS) - || !sparkConfigs.containsKey(SPARK_FS_DEFAULT_FS)) - && isYarnMaster()) { - throw new DdlException("Missing (" + SPARK_YARN_RESOURCE_MANAGER_ADDRESS + " and " + SPARK_FS_DEFAULT_FS - + ") in yarn master"); + if (isYarnMaster()) { + if (!sparkConfigs.containsKey(SPARK_FS_DEFAULT_FS)) { + throw new DdlException("Missing (" + SPARK_FS_DEFAULT_FS + ") in yarn master"); + } + + String haEnabled = sparkConfigs.get(SPARK_YARN_RESOURCE_MANAGER_HA_ENABLED); + if (StringUtils.isNotEmpty(haEnabled) && "true".equals(haEnabled)) { + if (StringUtils.isEmpty(SPARK_YARN_RESOURCE_MANAGER_HA_RMIDS)) { + throw new DdlException("Missing (" + SPARK_YARN_RESOURCE_MANAGER_HA_RMIDS + ") in yarn master, " + + "when " + SPARK_YARN_RESOURCE_MANAGER_HA_ENABLED + "=true."); + } + + String[] haIds = sparkConfigs.get(SPARK_YARN_RESOURCE_MANAGER_HA_RMIDS).split(","); + for (String haId : haIds) { + String addressKey = String.format(YARN_RESOURCE_MANAGER_ADDRESS_FOMART, haId); + String hostnameKey = String.format(YARN_RESOURCE_MANAGER_HOSTNAME_FORMAT, haId); + if (!sparkConfigs.containsKey(addressKey) && !sparkConfigs.containsKey(hostnameKey)) { + throw new DdlException("Missing " + addressKey + " or " + hostnameKey + " in yarn master, " + + "when " + SPARK_YARN_RESOURCE_MANAGER_HA_ENABLED + "=true."); + } + } + } else if (!sparkConfigs.containsKey(SPARK_YARN_RESOURCE_MANAGER_ADDRESS)) { + throw new DdlException("Missing (" + SPARK_YARN_RESOURCE_MANAGER_ADDRESS + ") in yarn master, " + + "or not turned on ha."); + } } // check working dir and broker diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/SparkResourceTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/SparkResourceTest.java index dc2436c434..0291e122bc 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/SparkResourceTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/SparkResourceTest.java @@ -117,6 +117,22 @@ public class SparkResourceTest { BaseProcResult result = new BaseProcResult(); resource.getProcNodeData(result); Assert.assertEquals(9, result.getRows().size()); + + properties.clear(); + properties.put("type", type); + properties.put("spark.master", "yarn"); + properties.put("spark.submit.deployMode", "cluster"); + properties.put("spark.hadoop.yarn.resourcemanager.ha.enabled", "true"); + properties.put("spark.hadoop.yarn.resourcemanager.ha.rm-ids", "rm1,rm2"); + properties.put("spark.hadoop.yarn.resourcemanager.hostname.rm1", "host1"); + properties.put("spark.hadoop.yarn.resourcemanager.hostname.rm2", "host2"); + properties.put("spark.hadoop.fs.defaultFS", "hdfs://127.0.0.1:10000"); + stmt = new CreateResourceStmt(true, false, name, properties); + stmt.analyze(analyzer); + resource = (SparkResource) Resource.fromStmt(stmt); + Assert.assertTrue(resource.isYarnMaster()); + map = resource.getSparkConfigs(); + Assert.assertEquals(7, map.size()); } @Test --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
