This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch 1.3.6-prepare
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/1.3.6-prepare by this push:
     new e5dcbad  [1.3.6-prepare][Feature-4960][Spark] Add name input for Spark 
and improve Spark & MR args #4959 (#5017)
e5dcbad is described below

commit e5dcbad66d4b5853cdd100b5472e15613a290677
Author: Shiwen Cheng <[email protected]>
AuthorDate: Wed Mar 10 09:27:30 2021 +0800

    [1.3.6-prepare][Feature-4960][Spark] Add name input for Spark and improve 
Spark & MR args #4959 (#5017)
---
 .../dolphinscheduler/api/service/BaseService.java  |   1 +
 .../apache/dolphinscheduler/common/Constants.java  |   4 +
 .../common/task/spark/SparkParameters.java         | 369 +++++++++++----------
 .../server/utils/SparkArgsUtils.java               |  62 ++--
 .../server/worker/task/mr/MapReduceTask.java       |  38 ++-
 .../server/worker/task/spark/SparkTask.java        | 195 +++++------
 .../server/utils/FlinkArgsUtilsTest.java           |  24 +-
 .../server/utils/SparkArgsUtilsTest.java           |  65 ++--
 .../pages/dag/_source/formModel/tasks/spark.vue    |  17 +
 9 files changed, 415 insertions(+), 360 deletions(-)

diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/BaseService.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/BaseService.java
index 96bca0d..ae7fc6a 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/BaseService.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/BaseService.java
@@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.UserType;
 import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.common.utils.HadoopUtils;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
 import org.apache.dolphinscheduler.dao.entity.User;
 
 import java.text.MessageFormat;
diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index 7f7421f..0f0a181 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -594,6 +594,10 @@ public final class Constants {
      */
     public static final String EXECUTOR_MEMORY = "--executor-memory";
 
+    /**
+     * --name NAME
+     */
+    public static final String SPARK_NAME = "--name";
 
     /**
      * --queue QUEUE
diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/spark/SparkParameters.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/spark/SparkParameters.java
index 32a2a6b..947f09e 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/spark/SparkParameters.java
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/spark/SparkParameters.java
@@ -14,12 +14,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.common.task.spark;
 
 import org.apache.dolphinscheduler.common.enums.ProgramType;
 import org.apache.dolphinscheduler.common.process.ResourceInfo;
 import org.apache.dolphinscheduler.common.task.AbstractParameters;
-import org.apache.dolphinscheduler.common.utils.CollectionUtils;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -29,203 +29,214 @@ import java.util.List;
  */
 public class SparkParameters extends AbstractParameters {
 
-  /**
-   * major jar
-   */
-  private ResourceInfo mainJar;
-
-  /**
-   * major class
-   */
-  private String mainClass;
-
-  /**
-   * deploy mode
-   */
-  private String deployMode;
-
-  /**
-   * arguments
-   */
-  private String mainArgs;
-
-  /**
-   * driver-cores Number of cores used by the driver, only in cluster mode
-   */
-  private int driverCores;
-
-  /**
-   * driver-memory Memory for driver
-   */
-
-  private String driverMemory;
-
-  /**
-   * num-executors Number of executors to launch
-   */
-  private int numExecutors;
-
-  /**
-   * executor-cores Number of cores per executor
-   */
-  private int executorCores;
-
-  /**
-   * Memory per executor
-   */
-  private String executorMemory;
-
-  /**
-   * resource list
-   */
-  private List<ResourceInfo> resourceList = new ArrayList<>();
-
-  /**
-   * The YARN queue to submit to
-   */
-  private String queue;
-
-  /**
-   * other arguments
-   */
-  private String others;
-
-  /**
-   * program type
-   * 0 JAVA,1 SCALA,2 PYTHON
-   */
-  private ProgramType programType;
-
-  /**
-   * spark version
-   */
-  private String sparkVersion;
-
-  public ResourceInfo getMainJar() {
-    return mainJar;
-  }
-
-  public void setMainJar(ResourceInfo mainJar) {
-    this.mainJar = mainJar;
-  }
-
-  public String getMainClass() {
-    return mainClass;
-  }
-
-  public void setMainClass(String mainClass) {
-    this.mainClass = mainClass;
-  }
-
-  public String getDeployMode() {
-    return deployMode;
-  }
-
-  public void setDeployMode(String deployMode) {
-    this.deployMode = deployMode;
-  }
-
-  public String getMainArgs() {
-    return mainArgs;
-  }
-
-  public void setMainArgs(String mainArgs) {
-    this.mainArgs = mainArgs;
-  }
-
-  public int getDriverCores() {
-    return driverCores;
-  }
-
-  public void setDriverCores(int driverCores) {
-    this.driverCores = driverCores;
-  }
-
-  public String getDriverMemory() {
-    return driverMemory;
-  }
-
-  public void setDriverMemory(String driverMemory) {
-    this.driverMemory = driverMemory;
-  }
+    /**
+     * main jar
+     */
+    private ResourceInfo mainJar;
+
+    /**
+     * main class
+     */
+    private String mainClass;
+
+    /**
+     * deploy mode
+     */
+    private String deployMode;
+
+    /**
+     * arguments
+     */
+    private String mainArgs;
+
+    /**
+     * driver-cores Number of cores used by the driver, only in cluster mode
+     */
+    private int driverCores;
+
+    /**
+     * driver-memory Memory for driver
+     */
+
+    private String driverMemory;
+
+    /**
+     * num-executors Number of executors to launch
+     */
+    private int numExecutors;
+
+    /**
+     * executor-cores Number of cores per executor
+     */
+    private int executorCores;
+
+    /**
+     * Memory per executor
+     */
+    private String executorMemory;
+
+    /**
+     * app name
+     */
+    private String appName;
+
+    /**
+     * The YARN queue to submit to
+     */
+    private String queue;
+
+    /**
+     * other arguments
+     */
+    private String others;
+
+    /**
+     * program type
+     * 0 JAVA,1 SCALA,2 PYTHON
+     */
+    private ProgramType programType;
+
+    /**
+     * spark version
+     */
+    private String sparkVersion;
+
+    /**
+     * resource list
+     */
+    private List<ResourceInfo> resourceList = new ArrayList<>();
+
+    public ResourceInfo getMainJar() {
+        return mainJar;
+    }
+
+    public void setMainJar(ResourceInfo mainJar) {
+        this.mainJar = mainJar;
+    }
+
+    public String getMainClass() {
+        return mainClass;
+    }
+
+    public void setMainClass(String mainClass) {
+        this.mainClass = mainClass;
+    }
+
+    public String getDeployMode() {
+        return deployMode;
+    }
+
+    public void setDeployMode(String deployMode) {
+        this.deployMode = deployMode;
+    }
+
+    public String getMainArgs() {
+        return mainArgs;
+    }
+
+    public void setMainArgs(String mainArgs) {
+        this.mainArgs = mainArgs;
+    }
+
+    public int getDriverCores() {
+        return driverCores;
+    }
+
+    public void setDriverCores(int driverCores) {
+        this.driverCores = driverCores;
+    }
+
+    public String getDriverMemory() {
+        return driverMemory;
+    }
+
+    public void setDriverMemory(String driverMemory) {
+        this.driverMemory = driverMemory;
+    }
 
-  public int getNumExecutors() {
-    return numExecutors;
-  }
+    public int getNumExecutors() {
+        return numExecutors;
+    }
 
-  public void setNumExecutors(int numExecutors) {
-    this.numExecutors = numExecutors;
-  }
+    public void setNumExecutors(int numExecutors) {
+        this.numExecutors = numExecutors;
+    }
 
-  public int getExecutorCores() {
-    return executorCores;
-  }
-
-  public void setExecutorCores(int executorCores) {
-    this.executorCores = executorCores;
-  }
+    public int getExecutorCores() {
+        return executorCores;
+    }
 
-  public String getExecutorMemory() {
-    return executorMemory;
-  }
+    public void setExecutorCores(int executorCores) {
+        this.executorCores = executorCores;
+    }
 
-  public void setExecutorMemory(String executorMemory) {
-    this.executorMemory = executorMemory;
-  }
+    public String getExecutorMemory() {
+        return executorMemory;
+    }
 
+    public void setExecutorMemory(String executorMemory) {
+        this.executorMemory = executorMemory;
+    }
 
-  public String getQueue() {
-    return queue;
-  }
+    public String getAppName() {
+        return appName;
+    }
+
+    public void setAppName(String appName) {
+        this.appName = appName;
+    }
 
-  public void setQueue(String queue) {
-    this.queue = queue;
-  }
+    public String getQueue() {
+        return queue;
+    }
 
-  public List<ResourceInfo> getResourceList() {
-    return resourceList;
-  }
+    public void setQueue(String queue) {
+        this.queue = queue;
+    }
 
-  public void setResourceList(List<ResourceInfo> resourceList) {
-    this.resourceList = resourceList;
-  }
+    public String getOthers() {
+        return others;
+    }
 
-  public String getOthers() {
-    return others;
-  }
+    public void setOthers(String others) {
+        this.others = others;
+    }
 
-  public void setOthers(String others) {
-    this.others = others;
-  }
+    public List<ResourceInfo> getResourceList() {
+        return resourceList;
+    }
 
-  public ProgramType getProgramType() {
-    return programType;
-  }
+    public void setResourceList(List<ResourceInfo> resourceList) {
+        this.resourceList = resourceList;
+    }
 
-  public void setProgramType(ProgramType programType) {
-    this.programType = programType;
-  }
+    public ProgramType getProgramType() {
+        return programType;
+    }
 
-  public String getSparkVersion() {
-    return sparkVersion;
-  }
+    public void setProgramType(ProgramType programType) {
+        this.programType = programType;
+    }
 
-  public void setSparkVersion(String sparkVersion) {
-    this.sparkVersion = sparkVersion;
-  }
+    public String getSparkVersion() {
+        return sparkVersion;
+    }
 
-  @Override
-  public boolean checkParameters() {
-    return mainJar != null && programType != null;
-  }
+    public void setSparkVersion(String sparkVersion) {
+        this.sparkVersion = sparkVersion;
+    }
 
-  @Override
-  public List<ResourceInfo> getResourceFilesList() {
-    if (mainJar != null && !resourceList.contains(mainJar)) {
-      resourceList.add(mainJar);
+    @Override
+    public boolean checkParameters() {
+        return mainJar != null && programType != null;
     }
-    return resourceList;
-  }
 
+    @Override
+    public List<ResourceInfo> getResourceFilesList() {
+        if (mainJar != null && !resourceList.contains(mainJar)) {
+            resourceList.add(mainJar);
+        }
+        return resourceList;
+    }
 
 }
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtils.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtils.java
index 5cc7bd8..76828f3 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtils.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtils.java
@@ -14,24 +14,29 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.dolphinscheduler.server.utils;
 
+package org.apache.dolphinscheduler.server.utils;
 
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.ProgramType;
 import org.apache.dolphinscheduler.common.process.ResourceInfo;
 import org.apache.dolphinscheduler.common.task.spark.SparkParameters;
-import org.apache.commons.lang.StringUtils;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
 
 import java.util.ArrayList;
 import java.util.List;
 
-
 /**
- *  spark args utils
+ * spark args utils
  */
 public class SparkArgsUtils {
 
+    private static final String SPARK_CLUSTER = "cluster";
+
+    private static final String SPARK_LOCAL = "local";
+
+    private static final String SPARK_ON_YARN = "yarn";
+
     /**
      * build args
      *
@@ -40,29 +45,24 @@ public class SparkArgsUtils {
      */
     public static List<String> buildArgs(SparkParameters param) {
         List<String> args = new ArrayList<>();
-        String deployMode = "cluster";
-
         args.add(Constants.MASTER);
-        if(StringUtils.isNotEmpty(param.getDeployMode())){
-            deployMode = param.getDeployMode();
 
-        }
-        if(!"local".equals(deployMode)){
-            args.add("yarn");
+        String deployMode = StringUtils.isNotEmpty(param.getDeployMode()) ? 
param.getDeployMode() : SPARK_CLUSTER;
+        if (!SPARK_LOCAL.equals(deployMode)) {
+            args.add(SPARK_ON_YARN);
             args.add(Constants.DEPLOY_MODE);
         }
-
-        args.add(param.getDeployMode());
+        args.add(deployMode);
 
         ProgramType type = param.getProgramType();
         String mainClass = param.getMainClass();
-        if(type != null && type != ProgramType.PYTHON && 
StringUtils.isNotEmpty(mainClass)){
+        if (type != null && type != ProgramType.PYTHON && 
StringUtils.isNotEmpty(mainClass)) {
             args.add(Constants.MAIN_CLASS);
             args.add(mainClass);
         }
 
         int driverCores = param.getDriverCores();
-        if (driverCores != 0) {
+        if (driverCores > 0) {
             args.add(Constants.DRIVER_CORES);
             args.add(String.format("%d", driverCores));
         }
@@ -74,13 +74,13 @@ public class SparkArgsUtils {
         }
 
         int numExecutors = param.getNumExecutors();
-        if (numExecutors != 0) {
+        if (numExecutors > 0) {
             args.add(Constants.NUM_EXECUTORS);
             args.add(String.format("%d", numExecutors));
         }
 
         int executorCores = param.getExecutorCores();
-        if (executorCores != 0) {
+        if (executorCores > 0) {
             args.add(Constants.EXECUTOR_CORES);
             args.add(String.format("%d", executorCores));
         }
@@ -91,22 +91,26 @@ public class SparkArgsUtils {
             args.add(executorMemory);
         }
 
-        // --files --conf --libjar ...
-        String others = param.getOthers();
-        String queue = param.getQueue();
-        if (StringUtils.isNotEmpty(others)) {
+        String appName = param.getAppName();
+        if (StringUtils.isNotEmpty(appName)) {
+            args.add(Constants.SPARK_NAME);
+            args.add(ArgsUtils.escape(appName));
+        }
 
-            if(!others.contains(Constants.SPARK_QUEUE) && 
StringUtils.isNotEmpty(queue)){
-                args.add(Constants.SPARK_QUEUE);
-                args.add(queue);
+        String others = param.getOthers();
+        if (!SPARK_LOCAL.equals(deployMode)) {
+            if (StringUtils.isEmpty(others) || 
!others.contains(Constants.SPARK_QUEUE)) {
+                String queue = param.getQueue();
+                if (StringUtils.isNotEmpty(queue)) {
+                    args.add(Constants.SPARK_QUEUE);
+                    args.add(queue);
+                }
             }
+        }
 
+        // --conf --files --jars --packages
+        if (StringUtils.isNotEmpty(others)) {
             args.add(others);
-
-        }else if (StringUtils.isNotEmpty(queue)) {
-            args.add(Constants.SPARK_QUEUE);
-            args.add(queue);
-
         }
 
         ResourceInfo mainJar = param.getMainJar();
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java
index fed7b27..43a7079 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java
@@ -26,21 +26,27 @@ import 
org.apache.dolphinscheduler.common.task.mr.MapreduceParameters;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.ParameterUtils;
 import org.apache.dolphinscheduler.common.utils.StringUtils;
-import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.dao.entity.Resource;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.server.utils.ParamUtils;
 import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask;
-import org.slf4j.Logger;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
+import org.slf4j.Logger;
+
 /**
  * mapreduce task
  */
 public class MapReduceTask extends AbstractYarnTask {
 
+    /**
+     *  map reduce command
+     *  usage: hadoop jar <jar> [mainClass] [GENERIC_OPTIONS] args...
+     */
+    private static final String MAP_REDUCE_COMMAND = Constants.HADOOP;
 
     /**
      * mapreduce parameters
@@ -77,7 +83,6 @@ public class MapReduceTask extends AbstractYarnTask {
         mapreduceParameters.setQueue(taskExecutionContext.getQueue());
         setMainJarName();
 
-
         // replace placeholder
         Map<String, Property> paramsMap = 
ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
                 taskExecutionContext.getDefinedParams(),
@@ -85,10 +90,10 @@ public class MapReduceTask extends AbstractYarnTask {
                 CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
                 taskExecutionContext.getScheduleTime());
 
-        if (paramsMap != null){
+        if (paramsMap != null) {
             String args = 
ParameterUtils.convertParameterPlaceholders(mapreduceParameters.getMainArgs(),  
ParamUtils.convert(paramsMap));
             mapreduceParameters.setMainArgs(args);
-            if(mapreduceParameters.getProgramType() != null && 
mapreduceParameters.getProgramType() == ProgramType.PYTHON){
+            if (mapreduceParameters.getProgramType() != null && 
mapreduceParameters.getProgramType() == ProgramType.PYTHON) {
                 String others = 
ParameterUtils.convertParameterPlaceholders(mapreduceParameters.getOthers(),  
ParamUtils.convert(paramsMap));
                 mapreduceParameters.setOthers(others);
             }
@@ -102,9 +107,13 @@ public class MapReduceTask extends AbstractYarnTask {
      */
     @Override
     protected String buildCommand() throws Exception {
-        List<String> parameterList = buildParameters(mapreduceParameters);
+        // hadoop jar <jar> [mainClass] [GENERIC_OPTIONS] args...
+        List<String> args = new ArrayList<>();
+        args.add(MAP_REDUCE_COMMAND);
+
+        args.addAll(buildParameters(mapreduceParameters));
 
-        String command = 
ParameterUtils.convertParameterPlaceholders(String.join(" ", parameterList),
+        String command = 
ParameterUtils.convertParameterPlaceholders(String.join(" ", args),
                 taskExecutionContext.getDefinedParams());
         logger.info("mapreduce task command: {}", command);
 
@@ -143,21 +152,18 @@ public class MapReduceTask extends AbstractYarnTask {
      * @param mapreduceParameters mapreduce parameters
      * @return parameter list
      */
-    private List<String> buildParameters(MapreduceParameters 
mapreduceParameters){
-
+    private List<String> buildParameters(MapreduceParameters 
mapreduceParameters) {
         List<String> result = new ArrayList<>();
 
-        result.add(Constants.HADOOP);
-
         // main jar
-        if(mapreduceParameters.getMainJar()!= null){
+        if (mapreduceParameters.getMainJar() != null) {
             result.add(Constants.JAR);
             result.add(mapreduceParameters.getMainJar().getRes());
         }
 
         // main class
-        if(!ProgramType.PYTHON.equals(mapreduceParameters.getProgramType())
-                && StringUtils.isNotEmpty(mapreduceParameters.getMainClass())){
+        if (!ProgramType.PYTHON.equals(mapreduceParameters.getProgramType())
+                && StringUtils.isNotEmpty(mapreduceParameters.getMainClass())) 
{
             result.add(mapreduceParameters.getMainClass());
         }
 
@@ -170,13 +176,13 @@ public class MapReduceTask extends AbstractYarnTask {
             }
 
             result.add(mapreduceParameters.getOthers());
-        }else if (StringUtils.isNotEmpty(mapreduceParameters.getQueue())) {
+        } else if (StringUtils.isNotEmpty(mapreduceParameters.getQueue())) {
             result.add(String.format("%s %s=%s", Constants.D, 
Constants.MR_QUEUE, mapreduceParameters.getQueue()));
 
         }
 
         // command args
-        if(StringUtils.isNotEmpty(mapreduceParameters.getMainArgs())){
+        if (StringUtils.isNotEmpty(mapreduceParameters.getMainArgs())) {
             result.add(mapreduceParameters.getMainArgs());
         }
         return result;
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java
index 505d88f..3a27399 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.server.worker.task.spark;
 
 import org.apache.dolphinscheduler.common.enums.CommandType;
@@ -24,131 +25,137 @@ import 
org.apache.dolphinscheduler.common.task.AbstractParameters;
 import org.apache.dolphinscheduler.common.task.spark.SparkParameters;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.ParameterUtils;
-import org.apache.dolphinscheduler.common.utils.StringUtils;
-import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.dao.entity.Resource;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.server.utils.ParamUtils;
 import org.apache.dolphinscheduler.server.utils.SparkArgsUtils;
 import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask;
-import org.slf4j.Logger;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
+import org.slf4j.Logger;
+
 /**
  * spark task
  */
 public class SparkTask extends AbstractYarnTask {
 
-  /**
-   * spark1 command
-   */
-  private static final String SPARK1_COMMAND = 
"${SPARK_HOME1}/bin/spark-submit";
+    /**
+     * spark1 command
+     * usage: spark-submit [options] <app jar | python file> [app arguments]
+     */
+    private static final String SPARK1_COMMAND = 
"${SPARK_HOME1}/bin/spark-submit";
+
+    /**
+     * spark2 command
+     * usage: spark-submit [options] <app jar | python file> [app arguments]
+     */
+    private static final String SPARK2_COMMAND = 
"${SPARK_HOME2}/bin/spark-submit";
+
+    /**
+     * spark parameters
+     */
+    private SparkParameters sparkParameters;
+
+    /**
+     * taskExecutionContext
+     */
+    private final TaskExecutionContext sparkTaskExecutionContext;
+
+    public SparkTask(TaskExecutionContext taskExecutionContext, Logger logger) 
{
+        super(taskExecutionContext, logger);
+        this.sparkTaskExecutionContext = taskExecutionContext;
+    }
 
-  /**
-   * spark2 command
-   */
-  private static final String SPARK2_COMMAND = 
"${SPARK_HOME2}/bin/spark-submit";
+    @Override
+    public void init() {
 
-  /**
-   *  spark parameters
-   */
-  private SparkParameters sparkParameters;
+        logger.info("spark task params {}", 
sparkTaskExecutionContext.getTaskParams());
 
-  /**
-   * taskExecutionContext
-   */
-  private TaskExecutionContext taskExecutionContext;
+        sparkParameters = 
JSONUtils.parseObject(sparkTaskExecutionContext.getTaskParams(), 
SparkParameters.class);
 
-  public SparkTask(TaskExecutionContext taskExecutionContext, Logger logger) {
-    super(taskExecutionContext, logger);
-    this.taskExecutionContext = taskExecutionContext;
-  }
+        if (null == sparkParameters) {
+            logger.error("Spark params is null");
+            return;
+        }
 
-  @Override
-  public void init() {
+        if (!sparkParameters.checkParameters()) {
+            throw new RuntimeException("spark task params is not valid");
+        }
+        sparkParameters.setQueue(sparkTaskExecutionContext.getQueue());
+        setMainJarName();
+    }
 
-    logger.info("spark task params {}", taskExecutionContext.getTaskParams());
+    /**
+     * create command
+     *
+     * @return command
+     */
+    @Override
+    protected String buildCommand() {
+        // spark-submit [options] <app jar | python file> [app arguments]
+        List<String> args = new ArrayList<>();
+
+        //spark version
+        String sparkCommand = SPARK2_COMMAND;
+
+        if 
(SparkVersion.SPARK1.name().equals(sparkParameters.getSparkVersion())) {
+            sparkCommand = SPARK1_COMMAND;
+        }
 
-    sparkParameters = 
JSONUtils.parseObject(taskExecutionContext.getTaskParams(), 
SparkParameters.class);
+        args.add(sparkCommand);
 
-    if (!sparkParameters.checkParameters()) {
-      throw new RuntimeException("spark task params is not valid");
-    }
-    sparkParameters.setQueue(taskExecutionContext.getQueue());
+        // other parameters
+        args.addAll(SparkArgsUtils.buildArgs(sparkParameters));
 
-    setMainJarName();
+        // replace placeholder
+        Map<String, Property> paramsMap = 
ParamUtils.convert(ParamUtils.getUserDefParamsMap(sparkTaskExecutionContext.getDefinedParams()),
+            sparkTaskExecutionContext.getDefinedParams(),
+            sparkParameters.getLocalParametersMap(),
+            CommandType.of(sparkTaskExecutionContext.getCmdTypeIfComplement()),
+            sparkTaskExecutionContext.getScheduleTime());
 
-    if (StringUtils.isNotEmpty(sparkParameters.getMainArgs())) {
-      String args = sparkParameters.getMainArgs();
+        String command = null;
 
-      // replace placeholder
-      Map<String, Property> paramsMap = 
ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
-              taskExecutionContext.getDefinedParams(),
-              sparkParameters.getLocalParametersMap(),
-              CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
-              taskExecutionContext.getScheduleTime());
+        if (null != paramsMap) {
+            command = 
ParameterUtils.convertParameterPlaceholders(String.join(" ", args), 
ParamUtils.convert(paramsMap));
+        }
 
-      if (paramsMap != null ){
-        args = ParameterUtils.convertParameterPlaceholders(args, 
ParamUtils.convert(paramsMap));
-      }
-      sparkParameters.setMainArgs(args);
-    }
-  }
+        logger.info("spark task command: {}", command);
 
-  /**
-   * create command
-   * @return command
-   */
-  @Override
-  protected String buildCommand() {
-    List<String> args = new ArrayList<>();
+        return command;
+    }
 
-    //spark version
-    String sparkCommand = SPARK2_COMMAND;
+    @Override
+    protected void setMainJarName() {
+        // main jar
+        ResourceInfo mainJar = sparkParameters.getMainJar();
 
-    if (SparkVersion.SPARK1.name().equals(sparkParameters.getSparkVersion())) {
-      sparkCommand = SPARK1_COMMAND;
-    }
+        if (null == mainJar) {
+            throw new RuntimeException("Spark task jar params is null");
+        }
 
-    args.add(sparkCommand);
-
-    // other parameters
-    args.addAll(SparkArgsUtils.buildArgs(sparkParameters));
-
-    String command = ParameterUtils
-            .convertParameterPlaceholders(String.join(" ", args), 
taskExecutionContext.getDefinedParams());
-
-    logger.info("spark task command : {}", command);
-
-    return command;
-  }
-
-  @Override
-  protected void setMainJarName() {
-    // main jar
-    ResourceInfo mainJar = sparkParameters.getMainJar();
-    if (mainJar != null) {
-      int resourceId = mainJar.getId();
-      String resourceName;
-      if (resourceId == 0) {
-        resourceName = mainJar.getRes();
-      } else {
-        Resource resource = 
processService.getResourceById(sparkParameters.getMainJar().getId());
-        if (resource == null) {
-          logger.error("resource id: {} not exist", resourceId);
-          throw new RuntimeException(String.format("resource id: %d not 
exist", resourceId));
+        int resourceId = mainJar.getId();
+        String resourceName;
+        if (resourceId == 0) {
+            resourceName = mainJar.getRes();
+        } else {
+            Resource resource = 
processService.getResourceById(sparkParameters.getMainJar().getId());
+            if (resource == null) {
+                logger.error("resource id: {} not exist", resourceId);
+                throw new RuntimeException(String.format("resource id: %d not 
exist", resourceId));
+            }
+            resourceName = resource.getFullName().replaceFirst("/", "");
         }
-        resourceName = resource.getFullName().replaceFirst("/", "");
-      }
-      mainJar.setRes(resourceName);
-      sparkParameters.setMainJar(mainJar);
+        mainJar.setRes(resourceName);
+        sparkParameters.setMainJar(mainJar);
+
     }
-  }
 
-  @Override
-  public AbstractParameters getParameters() {
-    return sparkParameters;
-  }
+    @Override
+    public AbstractParameters getParameters() {
+        return sparkParameters;
+    }
 }
diff --git 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java
 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java
index f030628..dd12029 100644
--- 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java
+++ 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java
@@ -98,35 +98,35 @@ public class FlinkArgsUtilsTest {
         assertEquals("yarn-cluster", result.get(1));
 
         assertEquals("-ys", result.get(2));
-        assertSame(Integer.valueOf(result.get(3)),slot);
+        assertSame(slot, Integer.valueOf(result.get(3)));
 
-        assertEquals("-ynm",result.get(4));
-        assertEquals(result.get(5),appName);
+        assertEquals("-ynm", result.get(4));
+        assertEquals(appName, result.get(5));
 
         assertEquals("-yn", result.get(6));
-        assertSame(Integer.valueOf(result.get(7)),taskManager);
+        assertSame(taskManager, Integer.valueOf(result.get(7)));
 
         assertEquals("-yjm", result.get(8));
-        assertEquals(result.get(9),jobManagerMemory);
+        assertEquals(jobManagerMemory, result.get(9));
 
         assertEquals("-ytm", result.get(10));
-        assertEquals(result.get(11),taskManagerMemory);
+        assertEquals(taskManagerMemory, result.get(11));
 
         assertEquals("-yqu", result.get(12));
-        assertEquals(result.get(13),queue);
+        assertEquals(queue, result.get(13));
 
         assertEquals("-p", result.get(14));
-        assertSame(Integer.valueOf(result.get(15)),parallelism);
+        assertSame(parallelism, Integer.valueOf(result.get(15)));
 
         assertEquals("-sae", result.get(16));
 
-        assertEquals(result.get(17),others);
+        assertEquals(others, result.get(17));
 
         assertEquals("-c", result.get(18));
-        assertEquals(result.get(19),mainClass);
+        assertEquals(mainClass, result.get(19));
 
-        assertEquals(result.get(20),mainJar.getRes());
-        assertEquals(result.get(21),mainArgs);
+        assertEquals(mainJar.getRes(), result.get(20));
+        assertEquals(mainArgs, result.get(21));
 
         //Others param without -yqu
         FlinkParameters param1 = new FlinkParameters();
diff --git 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtilsTest.java
 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtilsTest.java
index 6e55fa7..f76c2ea 100644
--- 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtilsTest.java
+++ 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtilsTest.java
@@ -17,19 +17,20 @@
 
 package org.apache.dolphinscheduler.server.utils;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+
 import org.apache.dolphinscheduler.common.enums.ProgramType;
 import org.apache.dolphinscheduler.common.process.ResourceInfo;
 import org.apache.dolphinscheduler.common.task.spark.SparkParameters;
+
+import java.util.List;
+
 import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertSame;
-
 /**
  * Test SparkArgsUtils
  */
@@ -48,12 +49,11 @@ public class SparkArgsUtilsTest {
     public int executorCores = 6;
     public String sparkVersion = "SPARK1";
     public int numExecutors = 4;
+    public String appName = "spark test";
     public String queue = "queue1";
 
-
     @Before
-    public void setUp() throws Exception {
-
+    public void setUp() {
         ResourceInfo main = new ResourceInfo();
         main.setRes("testspark-1.0.0-SNAPSHOT.jar");
         mainJar = main;
@@ -78,6 +78,7 @@ public class SparkArgsUtilsTest {
         param.setProgramType(programType);
         param.setSparkVersion(sparkVersion);
         param.setMainArgs(mainArgs);
+        param.setAppName(appName);
         param.setQueue(queue);
 
         //Invoke buildArgs
@@ -87,42 +88,46 @@ public class SparkArgsUtilsTest {
         }
 
         //Expected values and order
-        assertEquals(result.size(),20);
+        assertEquals(22, result.size());
+
+        assertEquals("--master", result.get(0));
+        assertEquals("yarn", result.get(1));
+
+        assertEquals("--deploy-mode", result.get(2));
+        assertEquals(mode, result.get(3));
 
-        assertEquals(result.get(0),"--master");
-        assertEquals(result.get(1),"yarn");
+        assertEquals("--class", result.get(4));
+        assertEquals(mainClass, result.get(5));
 
-        assertEquals(result.get(2),"--deploy-mode");
-        assertEquals(result.get(3),mode);
+        assertEquals("--driver-cores", result.get(6));
+        assertSame(driverCores, Integer.valueOf(result.get(7)));
 
-        assertEquals(result.get(4),"--class");
-        assertEquals(result.get(5),mainClass);
+        assertEquals("--driver-memory", result.get(8));
+        assertEquals(driverMemory, result.get(9));
 
-        assertEquals(result.get(6),"--driver-cores");
-        assertSame(Integer.valueOf(result.get(7)),driverCores);
+        assertEquals("--num-executors", result.get(10));
+        assertSame(numExecutors, Integer.valueOf(result.get(11)));
 
-        assertEquals(result.get(8),"--driver-memory");
-        assertEquals(result.get(9),driverMemory);
+        assertEquals("--executor-cores", result.get(12));
+        assertSame(executorCores, Integer.valueOf(result.get(13)));
 
-        assertEquals(result.get(10),"--num-executors");
-        assertSame(Integer.valueOf(result.get(11)),numExecutors);
+        assertEquals("--executor-memory", result.get(14));
+        assertEquals(executorMemory, result.get(15));
 
-        assertEquals(result.get(12),"--executor-cores");
-        assertSame(Integer.valueOf(result.get(13)),executorCores);
+        assertEquals("--name", result.get(16));
+        assertEquals(ArgsUtils.escape(appName), result.get(17));
 
-        assertEquals(result.get(14),"--executor-memory");
-        assertEquals(result.get(15),executorMemory);
+        assertEquals("--queue", result.get(18));
+        assertEquals(queue, result.get(19));
 
-        assertEquals(result.get(16),"--queue");
-        assertEquals(result.get(17),queue);
-        assertEquals(result.get(18),mainJar.getRes());
-        assertEquals(result.get(19),mainArgs);
+        assertEquals(mainJar.getRes(), result.get(20));
+        assertEquals(mainArgs, result.get(21));
 
         //Others param without --queue
         SparkParameters param1 = new SparkParameters();
         param1.setOthers("--files xxx/hive-site.xml");
         param1.setQueue(queue);
         result = SparkArgsUtils.buildArgs(param1);
-        assertEquals(result.size(),7);
+        assertEquals(7, result.size());
     }
 }
\ No newline at end of file
diff --git 
a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/spark.vue
 
b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/spark.vue
index 78ac5c6..1dd07d2 100644
--- 
a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/spark.vue
+++ 
b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/spark.vue
@@ -79,6 +79,18 @@
         </x-radio-group>
       </div>
     </m-list-box>
+    <m-list-box>
+      <div slot="text">{{$t('App Name')}}</div>
+      <div slot="content">
+        <x-input
+                :disabled="isDetails"
+                type="input"
+                v-model="appName"
+                :placeholder="$t('Please enter app name(optional)')"
+                autocomplete="off">
+        </x-input>
+      </div>
+    </m-list-box>
     <div class="list-box-4p">
       <div class="clearfix list">
         <span class="sp1">{{$t('Driver Cores')}}</span>
@@ -241,6 +253,8 @@
         executorMemory: '2G',
         // Executor cores
         executorCores: 2,
+        // Spark app name
+        appName: '',
         // Main arguments
         mainArgs: '',
         // Option parameters
@@ -475,6 +489,7 @@
           numExecutors: this.numExecutors,
           executorMemory: this.executorMemory,
           executorCores: this.executorCores,
+          appName: this.appName,
           mainArgs: this.mainArgs,
           others: this.others,
           programType: this.programType,
@@ -534,6 +549,7 @@
           numExecutors: this.numExecutors,
           executorMemory: this.executorMemory,
           executorCores: this.executorCores,
+          appName: this.appName,
           mainArgs: this.mainArgs,
           others: this.others,
           programType: this.programType,
@@ -575,6 +591,7 @@
           this.numExecutors = o.params.numExecutors || 2
           this.executorMemory = o.params.executorMemory || '2G'
           this.executorCores = o.params.executorCores || 2
+          this.appName = o.params.appName || ''
           this.mainArgs = o.params.mainArgs || ''
           this.others = o.params.others
           this.programType = o.params.programType || 'SCALA'

Reply via email to