This is an automated email from the ASF dual-hosted git repository.

xincheng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 6c78c8ec9a [Improvement][Spark] Support Local Spark Cluster (#15589)
6c78c8ec9a is described below

commit 6c78c8ec9a5a85a5792aaaae8d7c3e8fe47012c9
Author: John Huang <[email protected]>
AuthorDate: Mon Apr 22 15:05:59 2024 +0800

    [Improvement][Spark] Support Local Spark Cluster (#15589)
    
    * [Improvement][Spark] Support Local Spark Cluster
    
    * remote default local from deploy mode
    
    ---------
    
    Co-authored-by: Rick Cheng <[email protected]>
---
 docs/docs/en/guide/task/spark.md                   |  1 +
 docs/docs/zh/guide/task/spark.md                   |  1 +
 .../plugin/task/spark/SparkParameters.java         |  5 ++
 .../plugin/task/spark/SparkTask.java               | 23 ++++--
 .../plugin/task/spark/SparkParametersTest.java     |  1 -
 .../plugin/task/spark/SparkTaskTest.java           | 93 ++++++++++++++++++++--
 dolphinscheduler-ui/src/locales/en_US/project.ts   |  2 +
 dolphinscheduler-ui/src/locales/zh_CN/project.ts   |  2 +
 .../task/components/node/fields/use-spark.ts       | 26 ++++++
 .../projects/task/components/node/format-data.ts   |  1 +
 .../task/components/node/tasks/use-spark.ts        |  3 +-
 11 files changed, 143 insertions(+), 15 deletions(-)

diff --git a/docs/docs/en/guide/task/spark.md b/docs/docs/en/guide/task/spark.md
index 3e0f83b253..930f2cd0b0 100644
--- a/docs/docs/en/guide/task/spark.md
+++ b/docs/docs/en/guide/task/spark.md
@@ -24,6 +24,7 @@ Spark task type for executing Spark application. When 
executing the Spark task,
 
|----------------------------|------------------------------------------------------------------------------------------------------------------------------------|
 | Program type               | Supports Java, Scala, Python, and SQL.          
                                                                                
   |
 | The class of main function | The **full path** of Main Class, the entry 
point of the Spark program.                                                     
        |
+| Master                     | The The master URL for the cluster.             
                                                                                
   |
 | Main jar package           | The Spark jar package (upload by Resource 
Center).                                                                        
         |
 | SQL scripts                | SQL statements in .sql files that Spark sql 
runs.                                                                           
       |
 | Deployment mode            | <ul><li>spark submit supports three modes: 
cluster, client and local.</li><li>spark sql supports client and local 
modes.</li></ul> |
diff --git a/docs/docs/zh/guide/task/spark.md b/docs/docs/zh/guide/task/spark.md
index a392f55826..2f7b2ee346 100644
--- a/docs/docs/zh/guide/task/spark.md
+++ b/docs/docs/zh/guide/task/spark.md
@@ -23,6 +23,7 @@ Spark  任务类型用于执行 Spark 应用。对于 Spark 节点,worker 支
 - 程序类型:支持 Java、Scala、Python 和 SQL 四种语言。
 - 主函数的 Class:Spark 程序的入口 Main class 的全路径。
 - 主程序包:执行 Spark 程序的 jar 包(通过资源中心上传)。
+- Master:执行 Spark 集群的 Master Url。
 - SQL脚本:Spark sql 运行的 .sql 文件中的 SQL 语句。
 - 部署方式:(1) spark submit 支持 cluster、client 和 local 三种模式。
   (2) spark sql 支持 client 和 local 两种模式。
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkParameters.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkParameters.java
index c5fcb5b76b..873ba22c71 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkParameters.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkParameters.java
@@ -38,6 +38,11 @@ public class SparkParameters extends AbstractParameters {
      */
     private String mainClass;
 
+    /**
+     * master url
+     */
+    private String master;
+
     /**
      * deploy mode  local / cluster / client
      */
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java
index a0d1f3fc77..3c5fc17698 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java
@@ -124,22 +124,31 @@ public class SparkTask extends AbstractYarnTask {
      */
     private List<String> populateSparkOptions() {
         List<String> args = new ArrayList<>();
-        args.add(SparkConstants.MASTER);
 
+        // see 
https://spark.apache.org/docs/latest/submitting-applications.html
+        // TODO remove the option 'local' from deploy-mode
         String deployMode = 
StringUtils.isNotEmpty(sparkParameters.getDeployMode()) ? 
sparkParameters.getDeployMode()
                 : SparkConstants.DEPLOY_MODE_LOCAL;
 
+        boolean onLocal = SparkConstants.DEPLOY_MODE_LOCAL.equals(deployMode);
         boolean onNativeKubernetes = 
StringUtils.isNotEmpty(sparkParameters.getNamespace());
 
-        String masterUrl = onNativeKubernetes ? SPARK_ON_K8S_MASTER_PREFIX +
-                
Config.fromKubeconfig(taskExecutionContext.getK8sTaskExecutionContext().getConfigYaml()).getMasterUrl()
-                : SparkConstants.SPARK_ON_YARN;
+        String masterUrl = StringUtils.isNotEmpty(sparkParameters.getMaster()) 
? sparkParameters.getMaster()
+                : onLocal ? deployMode
+                        : onNativeKubernetes
+                                ? SPARK_ON_K8S_MASTER_PREFIX + Config
+                                        .fromKubeconfig(
+                                                
taskExecutionContext.getK8sTaskExecutionContext().getConfigYaml())
+                                        .getMasterUrl()
+                                : SparkConstants.SPARK_ON_YARN;
+
+        args.add(SparkConstants.MASTER);
+        args.add(masterUrl);
 
-        if (!SparkConstants.DEPLOY_MODE_LOCAL.equals(deployMode)) {
-            args.add(masterUrl);
+        if (!onLocal) {
             args.add(SparkConstants.DEPLOY_MODE);
+            args.add(deployMode);
         }
-        args.add(deployMode);
 
         ProgramType programType = sparkParameters.getProgramType();
         String mainClass = sparkParameters.getMainClass();
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkParametersTest.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkParametersTest.java
index ab164f2eb5..19ec707c62 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkParametersTest.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkParametersTest.java
@@ -54,6 +54,5 @@ public class SparkParametersTest {
         resourceFilesList = sparkParameters.getResourceFilesList();
         Assertions.assertNotNull(resourceFilesList);
         Assertions.assertEquals(3, resourceFilesList.size());
-
     }
 }
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskTest.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskTest.java
index 78d8968e59..5138562564 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskTest.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskTest.java
@@ -17,16 +17,27 @@
 
 package org.apache.dolphinscheduler.plugin.task.spark;
 
+import static 
org.apache.dolphinscheduler.plugin.task.spark.SparkConstants.TYPE_FILE;
+import static org.mockito.ArgumentMatchers.any;
+
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
 import org.apache.dolphinscheduler.plugin.task.api.resource.ResourceContext;
 
+import org.apache.commons.io.FileUtils;
+
+import java.io.File;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.List;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.MockedStatic;
 import org.mockito.Mockito;
 import org.mockito.junit.jupiter.MockitoExtension;
 
@@ -41,25 +52,67 @@ public class SparkTaskTest {
         Mockito.when(taskExecutionContext.getExecutePath()).thenReturn("/tmp");
         Mockito.when(taskExecutionContext.getTaskAppId()).thenReturn("5536");
 
+        ResourceContext resourceContext = Mockito.mock(ResourceContext.class);
+        
Mockito.when(taskExecutionContext.getResourceContext()).thenReturn(resourceContext);
+        ResourceContext.ResourceItem resourceItem = new 
ResourceContext.ResourceItem();
+        resourceItem.setResourceAbsolutePathInLocal("test");
+        
Mockito.when(resourceContext.getResourceItem(any())).thenReturn(resourceItem);
+
+        try (MockedStatic<FileUtils> fileUtilsMockedStatic = 
Mockito.mockStatic(FileUtils.class)) {
+            fileUtilsMockedStatic
+                    .when(() -> FileUtils
+                            .readFileToString(any(File.class), 
any(Charset.class)))
+                    .thenReturn("test");
+
+            SparkTask sparkTask = Mockito.spy(new 
SparkTask(taskExecutionContext));
+            sparkTask.init();
+            Assertions.assertEquals(
+                    "${SPARK_HOME}/bin/spark-sql " +
+                            "--master yarn " +
+                            "--deploy-mode client " +
+                            "--conf spark.driver.cores=1 " +
+                            "--conf spark.driver.memory=512M " +
+                            "--conf spark.executor.instances=2 " +
+                            "--conf spark.executor.cores=2 " +
+                            "--conf spark.executor.memory=1G " +
+                            "--name sparksql " +
+                            "-f /tmp/5536_node.sql",
+                    sparkTask.getScript());
+        }
+    }
+
+    @Test
+    public void testBuildCommandWithSparkSubmit() {
+        String parameters = buildSparkParametersWithSparkSubmit();
+        TaskExecutionContext taskExecutionContext = 
Mockito.mock(TaskExecutionContext.class);
+        ResourceContext.ResourceItem resourceItem = new 
ResourceContext.ResourceItem();
+        
resourceItem.setResourceAbsolutePathInStorage("/lib/dolphinscheduler-task-spark.jar");
+        
resourceItem.setResourceAbsolutePathInLocal("/lib/dolphinscheduler-task-spark.jar");
+        ResourceContext resourceContext = new ResourceContext();
+        resourceContext.addResourceItem(resourceItem);
+
+        
Mockito.when(taskExecutionContext.getTaskParams()).thenReturn(parameters);
+        
Mockito.when(taskExecutionContext.getResourceContext()).thenReturn(resourceContext);
         SparkTask sparkTask = Mockito.spy(new SparkTask(taskExecutionContext));
         sparkTask.init();
         Assertions.assertEquals(
-                "${SPARK_HOME}/bin/spark-sql " +
+                "${SPARK_HOME}/bin/spark-submit " +
                         "--master yarn " +
                         "--deploy-mode client " +
+                        "--class 
org.apache.dolphinscheduler.plugin.task.spark.SparkTaskTest " +
                         "--conf spark.driver.cores=1 " +
                         "--conf spark.driver.memory=512M " +
                         "--conf spark.executor.instances=2 " +
                         "--conf spark.executor.cores=2 " +
                         "--conf spark.executor.memory=1G " +
-                        "--name sparksql " +
-                        "-f /tmp/5536_node.sql",
+                        "--name spark " +
+                        "/lib/dolphinscheduler-task-spark.jar",
                 sparkTask.getScript());
     }
 
     @Test
-    public void testBuildCommandWithSparkSubmit() {
-        String parameters = buildSparkParametersWithSparkSubmit();
+    public void testBuildCommandWithSparkSubmitMaster() {
+        String parameters = buildSparkParametersWithMaster();
         TaskExecutionContext taskExecutionContext = 
Mockito.mock(TaskExecutionContext.class);
         ResourceContext.ResourceItem resourceItem = new 
ResourceContext.ResourceItem();
         
resourceItem.setResourceAbsolutePathInStorage("/lib/dolphinscheduler-task-spark.jar");
@@ -73,7 +126,7 @@ public class SparkTaskTest {
         sparkTask.init();
         Assertions.assertEquals(
                 "${SPARK_HOME}/bin/spark-submit " +
-                        "--master yarn " +
+                        "--master spark://localhost:7077 " +
                         "--deploy-mode client " +
                         "--class 
org.apache.dolphinscheduler.plugin.task.spark.SparkTaskTest " +
                         "--conf spark.driver.cores=1 " +
@@ -91,6 +144,7 @@ public class SparkTaskTest {
         sparkParameters.setLocalParams(Collections.emptyList());
         sparkParameters.setRawScript("selcet 11111;");
         sparkParameters.setProgramType(ProgramType.SQL);
+        sparkParameters.setSqlExecutionType(TYPE_FILE);
         sparkParameters.setMainClass("");
         sparkParameters.setDeployMode("client");
         sparkParameters.setAppName("sparksql");
@@ -100,6 +154,13 @@ public class SparkTaskTest {
         sparkParameters.setNumExecutors(2);
         sparkParameters.setExecutorMemory("1G");
         sparkParameters.setExecutorCores(2);
+
+        ResourceInfo resourceInfo1 = new ResourceInfo();
+        resourceInfo1.setResourceName("testSparkParameters1.jar");
+        List<ResourceInfo> resourceInfos = new ArrayList<>(Arrays.asList(
+                resourceInfo1));
+        sparkParameters.setResourceList(resourceInfos);
+
         return JSONUtils.toJsonString(sparkParameters);
     }
 
@@ -122,4 +183,24 @@ public class SparkTaskTest {
         return JSONUtils.toJsonString(sparkParameters);
     }
 
+    private String buildSparkParametersWithMaster() {
+        SparkParameters sparkParameters = new SparkParameters();
+        sparkParameters.setLocalParams(Collections.emptyList());
+        sparkParameters.setProgramType(ProgramType.SCALA);
+        
sparkParameters.setMainClass("org.apache.dolphinscheduler.plugin.task.spark.SparkTaskTest");
+        sparkParameters.setDeployMode("client");
+        sparkParameters.setAppName("spark");
+        sparkParameters.setMaster("spark://localhost:7077");
+        sparkParameters.setOthers("");
+        sparkParameters.setDriverCores(1);
+        sparkParameters.setDriverMemory("512M");
+        sparkParameters.setNumExecutors(2);
+        sparkParameters.setExecutorMemory("1G");
+        sparkParameters.setExecutorCores(2);
+        ResourceInfo resourceInfo = new ResourceInfo();
+        resourceInfo.setResourceName("/lib/dolphinscheduler-task-spark.jar");
+        sparkParameters.setMainJar(resourceInfo);
+        return JSONUtils.toJsonString(sparkParameters);
+    }
+
 }
diff --git a/dolphinscheduler-ui/src/locales/en_US/project.ts 
b/dolphinscheduler-ui/src/locales/en_US/project.ts
index cb50b19fc7..7a39752526 100644
--- a/dolphinscheduler-ui/src/locales/en_US/project.ts
+++ b/dolphinscheduler-ui/src/locales/en_US/project.ts
@@ -447,6 +447,8 @@ export default {
     timeout_period_tips: 'Timeout must be a positive integer',
     script: 'Script',
     script_tips: 'Please enter script(required)',
+    master: 'Master',
+    master_tips: 'Please enter master url(required)',
     init_script: 'Initialization script',
     init_script_tips: 'Please enter initialization script',
     resources: 'Resources',
diff --git a/dolphinscheduler-ui/src/locales/zh_CN/project.ts 
b/dolphinscheduler-ui/src/locales/zh_CN/project.ts
index 6865a49abc..50e0a821ef 100644
--- a/dolphinscheduler-ui/src/locales/zh_CN/project.ts
+++ b/dolphinscheduler-ui/src/locales/zh_CN/project.ts
@@ -437,6 +437,8 @@ export default {
     timeout_period_tips: '超时时长必须为正整数',
     script: '脚本',
     script_tips: '请输入脚本(必填)',
+    master: 'Master',
+    master_tips: '请输入master url(必填)',
     init_script: '初始化脚本',
     init_script_tips: '请输入初始化脚本',
     resources: '资源',
diff --git 
a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-spark.ts
 
b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-spark.ts
index ad7fb77fa9..ab89e69e6d 100644
--- 
a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-spark.ts
+++ 
b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-spark.ts
@@ -37,6 +37,10 @@ export function useSpark(model: { [field: string]: any }): 
IJsonItem[] {
     model.programType === 'PYTHON' || model.programType === 'SQL' ? 0 : 24
   )
 
+  const masterSpan = computed(() =>
+    model.programType === 'PYTHON' || model.programType === 'SQL' ? 0 : 24
+  )
+
   const mainArgsSpan = computed(() => (model.programType === 'SQL' ? 0 : 24))
 
   const rawScriptSpan = computed(() =>
@@ -138,6 +142,28 @@ export function useSpark(model: { [field: string]: any }): 
IJsonItem[] {
         message: t('project.node.script_tips')
       }
     },
+    {
+      type: 'input',
+      field: 'master',
+      span: masterSpan,
+      name: t('project.node.master'),
+      props: {
+        placeholder: t('project.node.master_tips')
+      },
+      validate: {
+        trigger: ['input', 'blur'],
+        required: false,
+        validator(validate: any, value: string) {
+          if (
+            model.programType !== 'PYTHON' &&
+            !value &&
+            model.programType !== 'SQL'
+          ) {
+            return new Error(t('project.node.master_tips'))
+          }
+        }
+      }
+    },
     useDeployMode(24, ref(true), showCluster),
     useNamespace(),
     {
diff --git 
a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts 
b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
index ef9e5dec61..2ab1712b6f 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
@@ -68,6 +68,7 @@ export function formatParams(data: INodeData): {
   }
 
   if (data.taskType === 'SPARK') {
+    taskParams.master = data.master
     taskParams.driverCores = data.driverCores
     taskParams.driverMemory = data.driverMemory
     taskParams.numExecutors = data.numExecutors
diff --git 
a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-spark.ts
 
b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-spark.ts
index 05aa0fe1c6..1e5c929b0f 100644
--- 
a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-spark.ts
+++ 
b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-spark.ts
@@ -45,7 +45,8 @@ export function useSpark({
     timeout: 30,
     programType: 'SCALA',
     rawScript: '',
-    deployMode: 'local',
+    master: '',
+    deployMode: '',
     driverCores: 1,
     driverMemory: '512M',
     numExecutors: 2,

Reply via email to