This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch 1.3.6-prepare
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/1.3.6-prepare by this push:
new e5dcbad [1.3.6-prepare][Feature-4960][Spark] Add name input for Spark
and improve Spark & MR args #4959 (#5017)
e5dcbad is described below
commit e5dcbad66d4b5853cdd100b5472e15613a290677
Author: Shiwen Cheng <[email protected]>
AuthorDate: Wed Mar 10 09:27:30 2021 +0800
[1.3.6-prepare][Feature-4960][Spark] Add name input for Spark and improve
Spark & MR args #4959 (#5017)
---
.../dolphinscheduler/api/service/BaseService.java | 1 +
.../apache/dolphinscheduler/common/Constants.java | 4 +
.../common/task/spark/SparkParameters.java | 369 +++++++++++----------
.../server/utils/SparkArgsUtils.java | 62 ++--
.../server/worker/task/mr/MapReduceTask.java | 38 ++-
.../server/worker/task/spark/SparkTask.java | 195 +++++------
.../server/utils/FlinkArgsUtilsTest.java | 24 +-
.../server/utils/SparkArgsUtilsTest.java | 65 ++--
.../pages/dag/_source/formModel/tasks/spark.vue | 17 +
9 files changed, 415 insertions(+), 360 deletions(-)
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/BaseService.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/BaseService.java
index 96bca0d..ae7fc6a 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/BaseService.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/BaseService.java
@@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.HadoopUtils;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.User;
import java.text.MessageFormat;
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index 7f7421f..0f0a181 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -594,6 +594,10 @@ public final class Constants {
*/
public static final String EXECUTOR_MEMORY = "--executor-memory";
+ /**
+ * --name NAME
+ */
+ public static final String SPARK_NAME = "--name";
/**
* --queue QUEUE
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/spark/SparkParameters.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/spark/SparkParameters.java
index 32a2a6b..947f09e 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/spark/SparkParameters.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/spark/SparkParameters.java
@@ -14,12 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.common.task.spark;
import org.apache.dolphinscheduler.common.enums.ProgramType;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
-import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import java.util.ArrayList;
import java.util.List;
@@ -29,203 +29,214 @@ import java.util.List;
*/
public class SparkParameters extends AbstractParameters {
- /**
- * major jar
- */
- private ResourceInfo mainJar;
-
- /**
- * major class
- */
- private String mainClass;
-
- /**
- * deploy mode
- */
- private String deployMode;
-
- /**
- * arguments
- */
- private String mainArgs;
-
- /**
- * driver-cores Number of cores used by the driver, only in cluster mode
- */
- private int driverCores;
-
- /**
- * driver-memory Memory for driver
- */
-
- private String driverMemory;
-
- /**
- * num-executors Number of executors to launch
- */
- private int numExecutors;
-
- /**
- * executor-cores Number of cores per executor
- */
- private int executorCores;
-
- /**
- * Memory per executor
- */
- private String executorMemory;
-
- /**
- * resource list
- */
- private List<ResourceInfo> resourceList = new ArrayList<>();
-
- /**
- * The YARN queue to submit to
- */
- private String queue;
-
- /**
- * other arguments
- */
- private String others;
-
- /**
- * program type
- * 0 JAVA,1 SCALA,2 PYTHON
- */
- private ProgramType programType;
-
- /**
- * spark version
- */
- private String sparkVersion;
-
- public ResourceInfo getMainJar() {
- return mainJar;
- }
-
- public void setMainJar(ResourceInfo mainJar) {
- this.mainJar = mainJar;
- }
-
- public String getMainClass() {
- return mainClass;
- }
-
- public void setMainClass(String mainClass) {
- this.mainClass = mainClass;
- }
-
- public String getDeployMode() {
- return deployMode;
- }
-
- public void setDeployMode(String deployMode) {
- this.deployMode = deployMode;
- }
-
- public String getMainArgs() {
- return mainArgs;
- }
-
- public void setMainArgs(String mainArgs) {
- this.mainArgs = mainArgs;
- }
-
- public int getDriverCores() {
- return driverCores;
- }
-
- public void setDriverCores(int driverCores) {
- this.driverCores = driverCores;
- }
-
- public String getDriverMemory() {
- return driverMemory;
- }
-
- public void setDriverMemory(String driverMemory) {
- this.driverMemory = driverMemory;
- }
+ /**
+ * main jar
+ */
+ private ResourceInfo mainJar;
+
+ /**
+ * main class
+ */
+ private String mainClass;
+
+ /**
+ * deploy mode
+ */
+ private String deployMode;
+
+ /**
+ * arguments
+ */
+ private String mainArgs;
+
+ /**
+ * driver-cores Number of cores used by the driver, only in cluster mode
+ */
+ private int driverCores;
+
+ /**
+ * driver-memory Memory for driver
+ */
+
+ private String driverMemory;
+
+ /**
+ * num-executors Number of executors to launch
+ */
+ private int numExecutors;
+
+ /**
+ * executor-cores Number of cores per executor
+ */
+ private int executorCores;
+
+ /**
+ * Memory per executor
+ */
+ private String executorMemory;
+
+ /**
+ * app name
+ */
+ private String appName;
+
+ /**
+ * The YARN queue to submit to
+ */
+ private String queue;
+
+ /**
+ * other arguments
+ */
+ private String others;
+
+ /**
+ * program type
+ * 0 JAVA,1 SCALA,2 PYTHON
+ */
+ private ProgramType programType;
+
+ /**
+ * spark version
+ */
+ private String sparkVersion;
+
+ /**
+ * resource list
+ */
+ private List<ResourceInfo> resourceList = new ArrayList<>();
+
+ public ResourceInfo getMainJar() {
+ return mainJar;
+ }
+
+ public void setMainJar(ResourceInfo mainJar) {
+ this.mainJar = mainJar;
+ }
+
+ public String getMainClass() {
+ return mainClass;
+ }
+
+ public void setMainClass(String mainClass) {
+ this.mainClass = mainClass;
+ }
+
+ public String getDeployMode() {
+ return deployMode;
+ }
+
+ public void setDeployMode(String deployMode) {
+ this.deployMode = deployMode;
+ }
+
+ public String getMainArgs() {
+ return mainArgs;
+ }
+
+ public void setMainArgs(String mainArgs) {
+ this.mainArgs = mainArgs;
+ }
+
+ public int getDriverCores() {
+ return driverCores;
+ }
+
+ public void setDriverCores(int driverCores) {
+ this.driverCores = driverCores;
+ }
+
+ public String getDriverMemory() {
+ return driverMemory;
+ }
+
+ public void setDriverMemory(String driverMemory) {
+ this.driverMemory = driverMemory;
+ }
- public int getNumExecutors() {
- return numExecutors;
- }
+ public int getNumExecutors() {
+ return numExecutors;
+ }
- public void setNumExecutors(int numExecutors) {
- this.numExecutors = numExecutors;
- }
+ public void setNumExecutors(int numExecutors) {
+ this.numExecutors = numExecutors;
+ }
- public int getExecutorCores() {
- return executorCores;
- }
-
- public void setExecutorCores(int executorCores) {
- this.executorCores = executorCores;
- }
+ public int getExecutorCores() {
+ return executorCores;
+ }
- public String getExecutorMemory() {
- return executorMemory;
- }
+ public void setExecutorCores(int executorCores) {
+ this.executorCores = executorCores;
+ }
- public void setExecutorMemory(String executorMemory) {
- this.executorMemory = executorMemory;
- }
+ public String getExecutorMemory() {
+ return executorMemory;
+ }
+ public void setExecutorMemory(String executorMemory) {
+ this.executorMemory = executorMemory;
+ }
- public String getQueue() {
- return queue;
- }
+ public String getAppName() {
+ return appName;
+ }
+
+ public void setAppName(String appName) {
+ this.appName = appName;
+ }
- public void setQueue(String queue) {
- this.queue = queue;
- }
+ public String getQueue() {
+ return queue;
+ }
- public List<ResourceInfo> getResourceList() {
- return resourceList;
- }
+ public void setQueue(String queue) {
+ this.queue = queue;
+ }
- public void setResourceList(List<ResourceInfo> resourceList) {
- this.resourceList = resourceList;
- }
+ public String getOthers() {
+ return others;
+ }
- public String getOthers() {
- return others;
- }
+ public void setOthers(String others) {
+ this.others = others;
+ }
- public void setOthers(String others) {
- this.others = others;
- }
+ public List<ResourceInfo> getResourceList() {
+ return resourceList;
+ }
- public ProgramType getProgramType() {
- return programType;
- }
+ public void setResourceList(List<ResourceInfo> resourceList) {
+ this.resourceList = resourceList;
+ }
- public void setProgramType(ProgramType programType) {
- this.programType = programType;
- }
+ public ProgramType getProgramType() {
+ return programType;
+ }
- public String getSparkVersion() {
- return sparkVersion;
- }
+ public void setProgramType(ProgramType programType) {
+ this.programType = programType;
+ }
- public void setSparkVersion(String sparkVersion) {
- this.sparkVersion = sparkVersion;
- }
+ public String getSparkVersion() {
+ return sparkVersion;
+ }
- @Override
- public boolean checkParameters() {
- return mainJar != null && programType != null;
- }
+ public void setSparkVersion(String sparkVersion) {
+ this.sparkVersion = sparkVersion;
+ }
- @Override
- public List<ResourceInfo> getResourceFilesList() {
- if (mainJar != null && !resourceList.contains(mainJar)) {
- resourceList.add(mainJar);
+ @Override
+ public boolean checkParameters() {
+ return mainJar != null && programType != null;
}
- return resourceList;
- }
+ @Override
+ public List<ResourceInfo> getResourceFilesList() {
+ if (mainJar != null && !resourceList.contains(mainJar)) {
+ resourceList.add(mainJar);
+ }
+ return resourceList;
+ }
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtils.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtils.java
index 5cc7bd8..76828f3 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtils.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtils.java
@@ -14,24 +14,29 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dolphinscheduler.server.utils;
+package org.apache.dolphinscheduler.server.utils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ProgramType;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.spark.SparkParameters;
-import org.apache.commons.lang.StringUtils;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
import java.util.ArrayList;
import java.util.List;
-
/**
- * spark args utils
+ * spark args utils
*/
public class SparkArgsUtils {
+ private static final String SPARK_CLUSTER = "cluster";
+
+ private static final String SPARK_LOCAL = "local";
+
+ private static final String SPARK_ON_YARN = "yarn";
+
/**
* build args
*
@@ -40,29 +45,24 @@ public class SparkArgsUtils {
*/
public static List<String> buildArgs(SparkParameters param) {
List<String> args = new ArrayList<>();
- String deployMode = "cluster";
-
args.add(Constants.MASTER);
- if(StringUtils.isNotEmpty(param.getDeployMode())){
- deployMode = param.getDeployMode();
- }
- if(!"local".equals(deployMode)){
- args.add("yarn");
+ String deployMode = StringUtils.isNotEmpty(param.getDeployMode()) ?
param.getDeployMode() : SPARK_CLUSTER;
+ if (!SPARK_LOCAL.equals(deployMode)) {
+ args.add(SPARK_ON_YARN);
args.add(Constants.DEPLOY_MODE);
}
-
- args.add(param.getDeployMode());
+ args.add(deployMode);
ProgramType type = param.getProgramType();
String mainClass = param.getMainClass();
- if(type != null && type != ProgramType.PYTHON &&
StringUtils.isNotEmpty(mainClass)){
+ if (type != null && type != ProgramType.PYTHON &&
StringUtils.isNotEmpty(mainClass)) {
args.add(Constants.MAIN_CLASS);
args.add(mainClass);
}
int driverCores = param.getDriverCores();
- if (driverCores != 0) {
+ if (driverCores > 0) {
args.add(Constants.DRIVER_CORES);
args.add(String.format("%d", driverCores));
}
@@ -74,13 +74,13 @@ public class SparkArgsUtils {
}
int numExecutors = param.getNumExecutors();
- if (numExecutors != 0) {
+ if (numExecutors > 0) {
args.add(Constants.NUM_EXECUTORS);
args.add(String.format("%d", numExecutors));
}
int executorCores = param.getExecutorCores();
- if (executorCores != 0) {
+ if (executorCores > 0) {
args.add(Constants.EXECUTOR_CORES);
args.add(String.format("%d", executorCores));
}
@@ -91,22 +91,26 @@ public class SparkArgsUtils {
args.add(executorMemory);
}
- // --files --conf --libjar ...
- String others = param.getOthers();
- String queue = param.getQueue();
- if (StringUtils.isNotEmpty(others)) {
+ String appName = param.getAppName();
+ if (StringUtils.isNotEmpty(appName)) {
+ args.add(Constants.SPARK_NAME);
+ args.add(ArgsUtils.escape(appName));
+ }
- if(!others.contains(Constants.SPARK_QUEUE) &&
StringUtils.isNotEmpty(queue)){
- args.add(Constants.SPARK_QUEUE);
- args.add(queue);
+ String others = param.getOthers();
+ if (!SPARK_LOCAL.equals(deployMode)) {
+ if (StringUtils.isEmpty(others) ||
!others.contains(Constants.SPARK_QUEUE)) {
+ String queue = param.getQueue();
+ if (StringUtils.isNotEmpty(queue)) {
+ args.add(Constants.SPARK_QUEUE);
+ args.add(queue);
+ }
}
+ }
+ // --conf --files --jars --packages
+ if (StringUtils.isNotEmpty(others)) {
args.add(others);
-
- }else if (StringUtils.isNotEmpty(queue)) {
- args.add(Constants.SPARK_QUEUE);
- args.add(queue);
-
}
ResourceInfo mainJar = param.getMainJar();
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java
index fed7b27..43a7079 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java
@@ -26,21 +26,27 @@ import
org.apache.dolphinscheduler.common.task.mr.MapreduceParameters;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
-import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.dao.entity.Resource;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask;
-import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import org.slf4j.Logger;
+
/**
* mapreduce task
*/
public class MapReduceTask extends AbstractYarnTask {
+ /**
+ * map reduce command
+ * usage: hadoop jar <jar> [mainClass] [GENERIC_OPTIONS] args...
+ */
+ private static final String MAP_REDUCE_COMMAND = Constants.HADOOP;
/**
* mapreduce parameters
@@ -77,7 +83,6 @@ public class MapReduceTask extends AbstractYarnTask {
mapreduceParameters.setQueue(taskExecutionContext.getQueue());
setMainJarName();
-
// replace placeholder
Map<String, Property> paramsMap =
ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
taskExecutionContext.getDefinedParams(),
@@ -85,10 +90,10 @@ public class MapReduceTask extends AbstractYarnTask {
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime());
- if (paramsMap != null){
+ if (paramsMap != null) {
String args =
ParameterUtils.convertParameterPlaceholders(mapreduceParameters.getMainArgs(),
ParamUtils.convert(paramsMap));
mapreduceParameters.setMainArgs(args);
- if(mapreduceParameters.getProgramType() != null &&
mapreduceParameters.getProgramType() == ProgramType.PYTHON){
+ if (mapreduceParameters.getProgramType() != null &&
mapreduceParameters.getProgramType() == ProgramType.PYTHON) {
String others =
ParameterUtils.convertParameterPlaceholders(mapreduceParameters.getOthers(),
ParamUtils.convert(paramsMap));
mapreduceParameters.setOthers(others);
}
@@ -102,9 +107,13 @@ public class MapReduceTask extends AbstractYarnTask {
*/
@Override
protected String buildCommand() throws Exception {
- List<String> parameterList = buildParameters(mapreduceParameters);
+ // hadoop jar <jar> [mainClass] [GENERIC_OPTIONS] args...
+ List<String> args = new ArrayList<>();
+ args.add(MAP_REDUCE_COMMAND);
+
+ args.addAll(buildParameters(mapreduceParameters));
- String command =
ParameterUtils.convertParameterPlaceholders(String.join(" ", parameterList),
+ String command =
ParameterUtils.convertParameterPlaceholders(String.join(" ", args),
taskExecutionContext.getDefinedParams());
logger.info("mapreduce task command: {}", command);
@@ -143,21 +152,18 @@ public class MapReduceTask extends AbstractYarnTask {
* @param mapreduceParameters mapreduce parameters
* @return parameter list
*/
- private List<String> buildParameters(MapreduceParameters
mapreduceParameters){
-
+ private List<String> buildParameters(MapreduceParameters
mapreduceParameters) {
List<String> result = new ArrayList<>();
- result.add(Constants.HADOOP);
-
// main jar
- if(mapreduceParameters.getMainJar()!= null){
+ if (mapreduceParameters.getMainJar() != null) {
result.add(Constants.JAR);
result.add(mapreduceParameters.getMainJar().getRes());
}
// main class
- if(!ProgramType.PYTHON.equals(mapreduceParameters.getProgramType())
- && StringUtils.isNotEmpty(mapreduceParameters.getMainClass())){
+ if (!ProgramType.PYTHON.equals(mapreduceParameters.getProgramType())
+ && StringUtils.isNotEmpty(mapreduceParameters.getMainClass()))
{
result.add(mapreduceParameters.getMainClass());
}
@@ -170,13 +176,13 @@ public class MapReduceTask extends AbstractYarnTask {
}
result.add(mapreduceParameters.getOthers());
- }else if (StringUtils.isNotEmpty(mapreduceParameters.getQueue())) {
+ } else if (StringUtils.isNotEmpty(mapreduceParameters.getQueue())) {
result.add(String.format("%s %s=%s", Constants.D,
Constants.MR_QUEUE, mapreduceParameters.getQueue()));
}
// command args
- if(StringUtils.isNotEmpty(mapreduceParameters.getMainArgs())){
+ if (StringUtils.isNotEmpty(mapreduceParameters.getMainArgs())) {
result.add(mapreduceParameters.getMainArgs());
}
return result;
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java
index 505d88f..3a27399 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.server.worker.task.spark;
import org.apache.dolphinscheduler.common.enums.CommandType;
@@ -24,131 +25,137 @@ import
org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.spark.SparkParameters;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
-import org.apache.dolphinscheduler.common.utils.StringUtils;
-import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.dao.entity.Resource;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.utils.SparkArgsUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask;
-import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import org.slf4j.Logger;
+
/**
* spark task
*/
public class SparkTask extends AbstractYarnTask {
- /**
- * spark1 command
- */
- private static final String SPARK1_COMMAND =
"${SPARK_HOME1}/bin/spark-submit";
+ /**
+ * spark1 command
+ * usage: spark-submit [options] <app jar | python file> [app arguments]
+ */
+ private static final String SPARK1_COMMAND =
"${SPARK_HOME1}/bin/spark-submit";
+
+ /**
+ * spark2 command
+ * usage: spark-submit [options] <app jar | python file> [app arguments]
+ */
+ private static final String SPARK2_COMMAND =
"${SPARK_HOME2}/bin/spark-submit";
+
+ /**
+ * spark parameters
+ */
+ private SparkParameters sparkParameters;
+
+ /**
+ * taskExecutionContext
+ */
+ private final TaskExecutionContext sparkTaskExecutionContext;
+
+ public SparkTask(TaskExecutionContext taskExecutionContext, Logger logger)
{
+ super(taskExecutionContext, logger);
+ this.sparkTaskExecutionContext = taskExecutionContext;
+ }
- /**
- * spark2 command
- */
- private static final String SPARK2_COMMAND =
"${SPARK_HOME2}/bin/spark-submit";
+ @Override
+ public void init() {
- /**
- * spark parameters
- */
- private SparkParameters sparkParameters;
+ logger.info("spark task params {}",
sparkTaskExecutionContext.getTaskParams());
- /**
- * taskExecutionContext
- */
- private TaskExecutionContext taskExecutionContext;
+ sparkParameters =
JSONUtils.parseObject(sparkTaskExecutionContext.getTaskParams(),
SparkParameters.class);
- public SparkTask(TaskExecutionContext taskExecutionContext, Logger logger) {
- super(taskExecutionContext, logger);
- this.taskExecutionContext = taskExecutionContext;
- }
+ if (null == sparkParameters) {
+ logger.error("Spark params is null");
+ return;
+ }
- @Override
- public void init() {
+ if (!sparkParameters.checkParameters()) {
+ throw new RuntimeException("spark task params is not valid");
+ }
+ sparkParameters.setQueue(sparkTaskExecutionContext.getQueue());
+ setMainJarName();
+ }
- logger.info("spark task params {}", taskExecutionContext.getTaskParams());
+ /**
+ * create command
+ *
+ * @return command
+ */
+ @Override
+ protected String buildCommand() {
+ // spark-submit [options] <app jar | python file> [app arguments]
+ List<String> args = new ArrayList<>();
+
+ //spark version
+ String sparkCommand = SPARK2_COMMAND;
+
+ if
(SparkVersion.SPARK1.name().equals(sparkParameters.getSparkVersion())) {
+ sparkCommand = SPARK1_COMMAND;
+ }
- sparkParameters =
JSONUtils.parseObject(taskExecutionContext.getTaskParams(),
SparkParameters.class);
+ args.add(sparkCommand);
- if (!sparkParameters.checkParameters()) {
- throw new RuntimeException("spark task params is not valid");
- }
- sparkParameters.setQueue(taskExecutionContext.getQueue());
+ // other parameters
+ args.addAll(SparkArgsUtils.buildArgs(sparkParameters));
- setMainJarName();
+ // replace placeholder
+ Map<String, Property> paramsMap =
ParamUtils.convert(ParamUtils.getUserDefParamsMap(sparkTaskExecutionContext.getDefinedParams()),
+ sparkTaskExecutionContext.getDefinedParams(),
+ sparkParameters.getLocalParametersMap(),
+ CommandType.of(sparkTaskExecutionContext.getCmdTypeIfComplement()),
+ sparkTaskExecutionContext.getScheduleTime());
- if (StringUtils.isNotEmpty(sparkParameters.getMainArgs())) {
- String args = sparkParameters.getMainArgs();
+ String command = null;
- // replace placeholder
- Map<String, Property> paramsMap =
ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
- taskExecutionContext.getDefinedParams(),
- sparkParameters.getLocalParametersMap(),
- CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
- taskExecutionContext.getScheduleTime());
+ if (null != paramsMap) {
+ command =
ParameterUtils.convertParameterPlaceholders(String.join(" ", args),
ParamUtils.convert(paramsMap));
+ }
- if (paramsMap != null ){
- args = ParameterUtils.convertParameterPlaceholders(args,
ParamUtils.convert(paramsMap));
- }
- sparkParameters.setMainArgs(args);
- }
- }
+ logger.info("spark task command: {}", command);
- /**
- * create command
- * @return command
- */
- @Override
- protected String buildCommand() {
- List<String> args = new ArrayList<>();
+ return command;
+ }
- //spark version
- String sparkCommand = SPARK2_COMMAND;
+ @Override
+ protected void setMainJarName() {
+ // main jar
+ ResourceInfo mainJar = sparkParameters.getMainJar();
- if (SparkVersion.SPARK1.name().equals(sparkParameters.getSparkVersion())) {
- sparkCommand = SPARK1_COMMAND;
- }
+ if (null == mainJar) {
+ throw new RuntimeException("Spark task jar params is null");
+ }
- args.add(sparkCommand);
-
- // other parameters
- args.addAll(SparkArgsUtils.buildArgs(sparkParameters));
-
- String command = ParameterUtils
- .convertParameterPlaceholders(String.join(" ", args),
taskExecutionContext.getDefinedParams());
-
- logger.info("spark task command : {}", command);
-
- return command;
- }
-
- @Override
- protected void setMainJarName() {
- // main jar
- ResourceInfo mainJar = sparkParameters.getMainJar();
- if (mainJar != null) {
- int resourceId = mainJar.getId();
- String resourceName;
- if (resourceId == 0) {
- resourceName = mainJar.getRes();
- } else {
- Resource resource =
processService.getResourceById(sparkParameters.getMainJar().getId());
- if (resource == null) {
- logger.error("resource id: {} not exist", resourceId);
- throw new RuntimeException(String.format("resource id: %d not
exist", resourceId));
+ int resourceId = mainJar.getId();
+ String resourceName;
+ if (resourceId == 0) {
+ resourceName = mainJar.getRes();
+ } else {
+ Resource resource =
processService.getResourceById(sparkParameters.getMainJar().getId());
+ if (resource == null) {
+ logger.error("resource id: {} not exist", resourceId);
+ throw new RuntimeException(String.format("resource id: %d not
exist", resourceId));
+ }
+ resourceName = resource.getFullName().replaceFirst("/", "");
}
- resourceName = resource.getFullName().replaceFirst("/", "");
- }
- mainJar.setRes(resourceName);
- sparkParameters.setMainJar(mainJar);
+ mainJar.setRes(resourceName);
+ sparkParameters.setMainJar(mainJar);
+
}
- }
- @Override
- public AbstractParameters getParameters() {
- return sparkParameters;
- }
+ @Override
+ public AbstractParameters getParameters() {
+ return sparkParameters;
+ }
}
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java
index f030628..dd12029 100644
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java
@@ -98,35 +98,35 @@ public class FlinkArgsUtilsTest {
assertEquals("yarn-cluster", result.get(1));
assertEquals("-ys", result.get(2));
- assertSame(Integer.valueOf(result.get(3)),slot);
+ assertSame(slot, Integer.valueOf(result.get(3)));
- assertEquals("-ynm",result.get(4));
- assertEquals(result.get(5),appName);
+ assertEquals("-ynm", result.get(4));
+ assertEquals(appName, result.get(5));
assertEquals("-yn", result.get(6));
- assertSame(Integer.valueOf(result.get(7)),taskManager);
+ assertSame(taskManager, Integer.valueOf(result.get(7)));
assertEquals("-yjm", result.get(8));
- assertEquals(result.get(9),jobManagerMemory);
+ assertEquals(jobManagerMemory, result.get(9));
assertEquals("-ytm", result.get(10));
- assertEquals(result.get(11),taskManagerMemory);
+ assertEquals(taskManagerMemory, result.get(11));
assertEquals("-yqu", result.get(12));
- assertEquals(result.get(13),queue);
+ assertEquals(queue, result.get(13));
assertEquals("-p", result.get(14));
- assertSame(Integer.valueOf(result.get(15)),parallelism);
+ assertSame(parallelism, Integer.valueOf(result.get(15)));
assertEquals("-sae", result.get(16));
- assertEquals(result.get(17),others);
+ assertEquals(others, result.get(17));
assertEquals("-c", result.get(18));
- assertEquals(result.get(19),mainClass);
+ assertEquals(mainClass, result.get(19));
- assertEquals(result.get(20),mainJar.getRes());
- assertEquals(result.get(21),mainArgs);
+ assertEquals(mainJar.getRes(), result.get(20));
+ assertEquals(mainArgs, result.get(21));
//Others param without -yqu
FlinkParameters param1 = new FlinkParameters();
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtilsTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtilsTest.java
index 6e55fa7..f76c2ea 100644
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtilsTest.java
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtilsTest.java
@@ -17,19 +17,20 @@
package org.apache.dolphinscheduler.server.utils;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+
import org.apache.dolphinscheduler.common.enums.ProgramType;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.spark.SparkParameters;
+
+import java.util.List;
+
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertSame;
-
/**
* Test SparkArgsUtils
*/
@@ -48,12 +49,11 @@ public class SparkArgsUtilsTest {
public int executorCores = 6;
public String sparkVersion = "SPARK1";
public int numExecutors = 4;
+ public String appName = "spark test";
public String queue = "queue1";
-
@Before
- public void setUp() throws Exception {
-
+ public void setUp() {
ResourceInfo main = new ResourceInfo();
main.setRes("testspark-1.0.0-SNAPSHOT.jar");
mainJar = main;
@@ -78,6 +78,7 @@ public class SparkArgsUtilsTest {
param.setProgramType(programType);
param.setSparkVersion(sparkVersion);
param.setMainArgs(mainArgs);
+ param.setAppName(appName);
param.setQueue(queue);
//Invoke buildArgs
@@ -87,42 +88,46 @@ public class SparkArgsUtilsTest {
}
//Expected values and order
- assertEquals(result.size(),20);
+ assertEquals(22, result.size());
+
+ assertEquals("--master", result.get(0));
+ assertEquals("yarn", result.get(1));
+
+ assertEquals("--deploy-mode", result.get(2));
+ assertEquals(mode, result.get(3));
- assertEquals(result.get(0),"--master");
- assertEquals(result.get(1),"yarn");
+ assertEquals("--class", result.get(4));
+ assertEquals(mainClass, result.get(5));
- assertEquals(result.get(2),"--deploy-mode");
- assertEquals(result.get(3),mode);
+ assertEquals("--driver-cores", result.get(6));
+ assertSame(driverCores, Integer.valueOf(result.get(7)));
- assertEquals(result.get(4),"--class");
- assertEquals(result.get(5),mainClass);
+ assertEquals("--driver-memory", result.get(8));
+ assertEquals(driverMemory, result.get(9));
- assertEquals(result.get(6),"--driver-cores");
- assertSame(Integer.valueOf(result.get(7)),driverCores);
+ assertEquals("--num-executors", result.get(10));
+ assertSame(numExecutors, Integer.valueOf(result.get(11)));
- assertEquals(result.get(8),"--driver-memory");
- assertEquals(result.get(9),driverMemory);
+ assertEquals("--executor-cores", result.get(12));
+ assertSame(executorCores, Integer.valueOf(result.get(13)));
- assertEquals(result.get(10),"--num-executors");
- assertSame(Integer.valueOf(result.get(11)),numExecutors);
+ assertEquals("--executor-memory", result.get(14));
+ assertEquals(executorMemory, result.get(15));
- assertEquals(result.get(12),"--executor-cores");
- assertSame(Integer.valueOf(result.get(13)),executorCores);
+ assertEquals("--name", result.get(16));
+ assertEquals(ArgsUtils.escape(appName), result.get(17));
- assertEquals(result.get(14),"--executor-memory");
- assertEquals(result.get(15),executorMemory);
+ assertEquals("--queue", result.get(18));
+ assertEquals(queue, result.get(19));
- assertEquals(result.get(16),"--queue");
- assertEquals(result.get(17),queue);
- assertEquals(result.get(18),mainJar.getRes());
- assertEquals(result.get(19),mainArgs);
+ assertEquals(mainJar.getRes(), result.get(20));
+ assertEquals(mainArgs, result.get(21));
//Others param without --queue
SparkParameters param1 = new SparkParameters();
param1.setOthers("--files xxx/hive-site.xml");
param1.setQueue(queue);
result = SparkArgsUtils.buildArgs(param1);
- assertEquals(result.size(),7);
+ assertEquals(7, result.size());
}
}
\ No newline at end of file
diff --git
a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/spark.vue
b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/spark.vue
index 78ac5c6..1dd07d2 100644
---
a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/spark.vue
+++
b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/spark.vue
@@ -79,6 +79,18 @@
</x-radio-group>
</div>
</m-list-box>
+ <m-list-box>
+ <div slot="text">{{$t('App Name')}}</div>
+ <div slot="content">
+ <x-input
+ :disabled="isDetails"
+ type="input"
+ v-model="appName"
+ :placeholder="$t('Please enter app name(optional)')"
+ autocomplete="off">
+ </x-input>
+ </div>
+ </m-list-box>
<div class="list-box-4p">
<div class="clearfix list">
<span class="sp1">{{$t('Driver Cores')}}</span>
@@ -241,6 +253,8 @@
executorMemory: '2G',
// Executor cores
executorCores: 2,
+ // Spark app name
+ appName: '',
// Main arguments
mainArgs: '',
// Option parameters
@@ -475,6 +489,7 @@
numExecutors: this.numExecutors,
executorMemory: this.executorMemory,
executorCores: this.executorCores,
+ appName: this.appName,
mainArgs: this.mainArgs,
others: this.others,
programType: this.programType,
@@ -534,6 +549,7 @@
numExecutors: this.numExecutors,
executorMemory: this.executorMemory,
executorCores: this.executorCores,
+ appName: this.appName,
mainArgs: this.mainArgs,
others: this.others,
programType: this.programType,
@@ -575,6 +591,7 @@
this.numExecutors = o.params.numExecutors || 2
this.executorMemory = o.params.executorMemory || '2G'
this.executorCores = o.params.executorCores || 2
+ this.appName = o.params.appName || ''
this.mainArgs = o.params.mainArgs || ''
this.others = o.params.others
this.programType = o.params.programType || 'SCALA'