lenboo commented on a change in pull request #4061:
URL:
https://github.com/apache/incubator-dolphinscheduler/pull/4061#discussion_r524092459
##########
File path:
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java
##########
@@ -22,133 +23,132 @@
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.spark.SparkParameters;
-import org.apache.dolphinscheduler.common.utils.*;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
-import org.apache.dolphinscheduler.common.utils.StringUtils;
-import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.dao.entity.Resource;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.utils.SparkArgsUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask;
-import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import org.slf4j.Logger;
+
/**
* spark task
*/
public class SparkTask extends AbstractYarnTask {
- /**
- * spark1 command
- */
- private static final String SPARK1_COMMAND =
"${SPARK_HOME1}/bin/spark-submit";
+ /**
+ * spark1 command
+ */
+ private static final String SPARK1_COMMAND =
"${SPARK_HOME1}/bin/spark-submit";
+
+ /**
+ * spark2 command
+ */
+ private static final String SPARK2_COMMAND =
"${SPARK_HOME2}/bin/spark-submit";
+
+ /**
+ * spark parameters
+ */
+ private SparkParameters sparkParameters;
+
+ /**
+ * taskExecutionContext
+ */
+ private final TaskExecutionContext sparkTaskExecutionContext;
+
+ public SparkTask(TaskExecutionContext taskExecutionContext, Logger logger)
{
+ super(taskExecutionContext, logger);
+ this.sparkTaskExecutionContext = taskExecutionContext;
+ }
- /**
- * spark2 command
- */
- private static final String SPARK2_COMMAND =
"${SPARK_HOME2}/bin/spark-submit";
+ @Override
+ public void init() {
- /**
- * spark parameters
- */
- private SparkParameters sparkParameters;
+ logger.info("spark task params {}",
sparkTaskExecutionContext.getTaskParams());
- /**
- * taskExecutionContext
- */
- private TaskExecutionContext taskExecutionContext;
+ sparkParameters =
JSONUtils.parseObject(sparkTaskExecutionContext.getTaskParams(),
SparkParameters.class);
- public SparkTask(TaskExecutionContext taskExecutionContext, Logger logger) {
- super(taskExecutionContext, logger);
- this.taskExecutionContext = taskExecutionContext;
- }
+ if (null == sparkParameters) {
+ logger.error("Spark params is null");
+ return;
+ }
- @Override
- public void init() {
+ if (!sparkParameters.checkParameters()) {
+ throw new RuntimeException("spark task params is not valid");
+ }
+ sparkParameters.setQueue(sparkTaskExecutionContext.getQueue());
+ setMainJarName();
+ }
- logger.info("spark task params {}", taskExecutionContext.getTaskParams());
+ /**
+ * create command
+ *
+ * @return command
+ */
+ @Override
+ protected String buildCommand() {
+ List<String> args = new ArrayList<>();
- sparkParameters =
JSONUtils.parseObject(taskExecutionContext.getTaskParams(),
SparkParameters.class);
+ //spark version
+ String sparkCommand = SPARK2_COMMAND;
- if (!sparkParameters.checkParameters()) {
- throw new RuntimeException("spark task params is not valid");
- }
- sparkParameters.setQueue(taskExecutionContext.getQueue());
+ if
(SparkVersion.SPARK1.name().equals(sparkParameters.getSparkVersion())) {
+ sparkCommand = SPARK1_COMMAND;
+ }
- setMainJarName();
+ args.add(sparkCommand);
- if (StringUtils.isNotEmpty(sparkParameters.getMainArgs())) {
- String args = sparkParameters.getMainArgs();
+ // other parameters
+ args.addAll(SparkArgsUtils.buildArgs(sparkParameters));
- // replace placeholder
- Map<String, Property> paramsMap =
ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
- taskExecutionContext.getDefinedParams(),
- sparkParameters.getLocalParametersMap(),
- CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
- taskExecutionContext.getScheduleTime());
+ // replace placeholder
+ Map<String, Property> paramsMap =
ParamUtils.convert(ParamUtils.getUserDefParamsMap(sparkTaskExecutionContext.getDefinedParams()),
+ sparkTaskExecutionContext.getDefinedParams(),
+ sparkParameters.getLocalParametersMap(),
+ CommandType.of(sparkTaskExecutionContext.getCmdTypeIfComplement()),
+ sparkTaskExecutionContext.getScheduleTime());
- if (paramsMap != null ){
- args = ParameterUtils.convertParameterPlaceholders(args,
ParamUtils.convert(paramsMap));
- }
- sparkParameters.setMainArgs(args);
- }
- }
+ String command = null;
- /**
- * create command
- * @return command
- */
- @Override
- protected String buildCommand() {
- List<String> args = new ArrayList<>();
+ if (null != paramsMap) {
+ command =
ParameterUtils.convertParameterPlaceholders(String.join(" ", args),
ParamUtils.convert(paramsMap));
+ }
- //spark version
- String sparkCommand = SPARK2_COMMAND;
+ logger.info("spark task command: {}", command);
- if (SparkVersion.SPARK1.name().equals(sparkParameters.getSparkVersion())) {
- sparkCommand = SPARK1_COMMAND;
+ return command;
}
- args.add(sparkCommand);
-
- // other parameters
- args.addAll(SparkArgsUtils.buildArgs(sparkParameters));
-
- String command = ParameterUtils
- .convertParameterPlaceholders(String.join(" ", args),
taskExecutionContext.getDefinedParams());
-
- logger.info("spark task command : {}", command);
-
- return command;
- }
-
- @Override
- protected void setMainJarName() {
- // main jar
- ResourceInfo mainJar = sparkParameters.getMainJar();
- if (mainJar != null) {
- int resourceId = mainJar.getId();
- String resourceName;
- if (resourceId == 0) {
- resourceName = mainJar.getRes();
- } else {
- Resource resource =
processService.getResourceById(sparkParameters.getMainJar().getId());
- if (resource == null) {
- logger.error("resource id: {} not exist", resourceId);
- throw new RuntimeException(String.format("resource id: %d not
exist", resourceId));
+ @Override
+ protected void setMainJarName() {
+ // main jar
+ ResourceInfo mainJar = sparkParameters.getMainJar();
+ if (mainJar != null) {
Review comment:
i think it's better to return void or throw ex if null.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]