This is an automated email from the ASF dual-hosted git repository.
zhongjiajie pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 8eaf5a2309 [Feature-10219][EMR] EMR supports use <add-Steps> to add
steps to an existing cluster (#10657)
8eaf5a2309 is described below
commit 8eaf5a230998946001b80c9f373da844e6efcd21
Author: ZhaoGuodong <[email protected]>
AuthorDate: Sun Jul 10 10:15:25 2022 +0800
[Feature-10219][EMR] EMR supports use <add-Steps> to add steps to an
existing cluster (#10657)
* Add the ProgramType parameter to distinguish task types
* EmrAddStepsTask supports Add-Steps
* UI supports Add-Steps
* EmrTask modify the name of the class to EmrJobFlowTask
* add ERM Task abstract base class AbstractEmrTask
* add testcase for EmrAddStepsTask
* Modifying help Documents
---
docs/docs/en/guide/task/emr.md | 48 ++++-
docs/docs/zh/guide/task/emr.md | 47 ++++-
docs/img/tasks/demo/emr_add_job_flow_steps.png | Bin 0 -> 131722 bytes
docs/img/tasks/demo/emr_jobFlowId.png | Bin 0 -> 133514 bytes
docs/img/tasks/demo/emr_run_job_flow.png | Bin 0 -> 139410 bytes
.../plugin/task/emr/AbstractEmrTask.java | 113 ++++++++++++
.../plugin/task/emr/EmrAddStepsTask.java | 177 ++++++++++++++++++
.../task/emr/{EmrTask.java => EmrJobFlowTask.java} | 86 +--------
.../plugin/task/emr/EmrParameters.java | 39 ++--
.../plugin/task/emr/EmrTaskChannel.java | 10 +-
.../plugin/task/emr/ProgramType.java | 40 ++---
.../plugin/task/emr/EmrAddStepsTaskTest.java | 198 +++++++++++++++++++++
.../{EmrTaskTest.java => EmrJobFlowTaskTest.java} | 47 ++---
.../plugin/task/emr/EmrAddStepsDefine.json | 17 ++
.../plugin/task/emr/EmrErrorAddStepsDefine.json | 29 +++
dolphinscheduler-ui/src/locales/en_US/project.ts | 2 +
dolphinscheduler-ui/src/locales/zh_CN/project.ts | 2 +
.../task/components/node/fields/use-emr.ts | 41 +++++
.../projects/task/components/node/format-data.ts | 2 +
.../projects/task/components/node/tasks/use-emr.ts | 1 +
.../views/projects/task/components/node/types.ts | 1 +
21 files changed, 750 insertions(+), 150 deletions(-)
diff --git a/docs/docs/en/guide/task/emr.md b/docs/docs/en/guide/task/emr.md
index 050d7c2397..ebcaa885aa 100644
--- a/docs/docs/en/guide/task/emr.md
+++ b/docs/docs/en/guide/task/emr.md
@@ -2,7 +2,11 @@
## Overview
-Amazon EMR task type, for creating EMR clusters on AWS and running computing
tasks. Using [aws-java-sdk](https://aws.amazon.com/cn/sdk-for-java/) in the
background code, to transfer JSON parameters to
[RunJobFlowRequest](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/elasticmapreduce/model/RunJobFlowRequest.html)
object and submit to AWS.
+Amazon EMR task type, for operation EMR clusters on AWS and running computing
tasks.
+Using [aws-java-sdk](https://aws.amazon.com/cn/sdk-for-java/) in the
background code, to transfer JSON parameters to task object and submit to AWS,
Two program types are currently supported:
+
+* `RUN_JOB_FLOW` Using
[API_RunJobFlow](https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html#API_RunJobFlow_Examples)
submit
[RunJobFlowRequest](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/elasticmapreduce/model/RunJobFlowRequest.html)
object
+* `ADD_JOB_FLOW_STEPS` Using
[API_AddJobFlowSteps](https://docs.aws.amazon.com/emr/latest/APIReference/API_AddJobFlowSteps.html#API_AddJobFlowSteps_Examples)
submit
[AddJobFlowStepsRequest](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/elasticmapreduce/model/AddJobFlowStepsRequest.html)
object
## Create Task
@@ -19,12 +23,18 @@ Amazon EMR task type, for creating EMR clusters on AWS and
running computing tas
| Task priority | When the number of worker threads is insufficient, execute
in the order of priority from high to low, and tasks with the same priority
will execute in a first-in first-out order. |
| Worker grouping | Assign tasks to the machines of the worker group to
execute. If `Default` is selected, randomly select a worker machine for
execution. |
| Times of failed retry attempts | The number of times the task failed to
resubmit. You can select from drop-down or fill-in a number. |
-| Failed retry interval: The time interval for resubmitting the task after a
failed task. You can select from drop-down or fill-in a number. |
+| Failed retry interval | The time interval for resubmitting the task after a
failed task. You can select from drop-down or fill-in a number. |
| Timeout alarm | Check the timeout alarm and timeout failure. When the task
runs exceed the "timeout", an alarm email will send and the task execution will
fail. |
-| JSON | JSON corresponding to the
[RunJobFlowRequest](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/elasticmapreduce/model/RunJobFlowRequest.html)
object, for details refer to
[API_RunJobFlow_Examples](https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html#API_RunJobFlow_Examples).
|
+| Program Type | Select the program type. If it is `RUN_JOB_FLOW`, you need to
fill in `jobFlowDefineJson`, if it is `ADD_JOB_FLOW_STEPS`, you need to fill in
`stepsDefineJson`. |
+| jobFlowDefineJson | JSON corresponding to the
[RunJobFlowRequest](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/elasticmapreduce/model/RunJobFlowRequest.html)
object, for details refer to
[API_RunJobFlow_Examples](https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html#API_RunJobFlow_Examples).
|
+| stepsDefineJson | JSON corresponding to the
[AddJobFlowStepsRequest](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/elasticmapreduce/model/AddJobFlowStepsRequest.html)
object, for details refer to
[API_AddJobFlowSteps_Examples](https://docs.aws.amazon.com/emr/latest/APIReference/API_AddJobFlowSteps.html#API_AddJobFlowSteps_Examples).
|
-## JSON example
+## Task Example
+### Create an EMR cluster and run Steps
+This example shows how to create an `EMR` task node of type `RUN_JOB_FLOW`.
Taking the execution of `SparkPi` as an example, the task will create an `EMR`
cluster and execute the `SparkPi` sample program.
+
+jobFlowDefineJson example
```json
{
"Name": "SparkPi",
@@ -65,3 +75,33 @@ Amazon EMR task type, for creating EMR clusters on AWS and
running computing tas
}
```
+### Add a Step to a Running EMR Cluster
+This example shows how to create an `EMR` task node of type
`ADD_JOB_FLOW_STEPS`. Taking the execution of `SparkPi` as an example, the task
will add a `SparkPi` sample program to the running `EMR` cluster.
+
+
+
+stepsDefineJson example
+```json
+{
+ "JobFlowId": "j-3V628TKAERHP8",
+ "Steps": [
+ {
+ "Name": "calculate_pi",
+ "ActionOnFailure": "CONTINUE",
+ "HadoopJarStep": {
+ "Jar": "command-runner.jar",
+ "Args": [
+ "/usr/lib/spark/bin/run-example",
+ "SparkPi",
+ "15"
+ ]
+ }
+ }
+ ]
+}
+```
+
+## Notice
+
+- Failover on EMR Task type has not been implemented. In this time,
DolphinScheduler only supports failover on yarn task type . Other task type,
such as EMR task, k8s task not ready yet.
+- `stepsDefineJson` A task definition only supports the association of a
single step, which can better ensure the reliability of the task state.
diff --git a/docs/docs/zh/guide/task/emr.md b/docs/docs/zh/guide/task/emr.md
index dfa17f6658..59dab0046e 100644
--- a/docs/docs/zh/guide/task/emr.md
+++ b/docs/docs/zh/guide/task/emr.md
@@ -2,7 +2,11 @@
## 综述
-Amazon EMR任务类型,用于在AWS上创建EMR集群并执行计算任务。
后台使用[aws-java-sdk](https://aws.amazon.com/cn/sdk-for-java/)
将json参数转换为[RunJobFlowRequest](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/elasticmapreduce/model/RunJobFlowRequest.html)
对象,提交到AWS
+Amazon EMR 任务类型,用于在AWS上操作EMR集群并执行计算任务。
+后台使用 [aws-java-sdk](https://aws.amazon.com/cn/sdk-for-java/)
将JSON参数转换为任务对象,提交到AWS,目前支持两种程序类型:
+
+* `RUN_JOB_FLOW` 使用
[API_RunJobFlow](https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html#API_RunJobFlow_Examples)
提交
[RunJobFlowRequest](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/elasticmapreduce/model/RunJobFlowRequest.html)
对象
+* `ADD_JOB_FLOW_STEPS` 使用
[API_AddJobFlowSteps](https://docs.aws.amazon.com/emr/latest/APIReference/API_AddJobFlowSteps.html#API_AddJobFlowSteps_Examples)
提交
[AddJobFlowStepsRequest](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/elasticmapreduce/model/AddJobFlowStepsRequest.html)
对象
## 任务参数
- 节点名称:一个工作流定义中的节点名称是唯一的。
@@ -13,9 +17,16 @@ Amazon EMR任务类型,用于在AWS上创建EMR集群并执行计算任务。
- 失败重试次数:任务失败重新提交的次数,支持下拉和手填。
- 失败重试间隔:任务失败重新提交任务的时间间隔,支持下拉和手填。
- 超时告警:勾选超时告警、超时失败,当任务超过"超时时长"后,会发送告警邮件并且任务执行失败.
-- json:
[RunJobFlowRequest](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/elasticmapreduce/model/RunJobFlowRequest.html)
对象对应的json,详细json定义参见
[API_RunJobFlow_Examples](https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html#API_RunJobFlow_Examples)
+-
程序类型:选择程序类型,如果是`RUN_JOB_FLOW`,则需要填写`jobFlowDefineJson`,如果是`ADD_JOB_FLOW_STEPS`,则需要填写`stepsDefineJson`。
+ - jobFlowDefineJson:
[RunJobFlowRequest](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/elasticmapreduce/model/RunJobFlowRequest.html)
对象对应的JSON,详细JSON定义参见
[API_RunJobFlow_Examples](https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html#API_RunJobFlow_Examples)
+ -
stepsDefineJson:[AddJobFlowStepsRequest](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/elasticmapreduce/model/AddJobFlowStepsRequest.html)
对象对应的JSON,详细JSON定义参见
[API_AddJobFlowSteps_Examples](https://docs.aws.amazon.com/emr/latest/APIReference/API_AddJobFlowSteps.html#API_AddJobFlowSteps_Examples)
+
+## 任务样例
+### 创建EMR集群并运行Steps
+该样例展示了如何创建`RUN_JOB_FLOW`类型`EMR`任务节点,以执行`SparkPi`为例,该任务会创建一个`EMR`集群,并且执行`SparkPi`示例程序。
+
-## json参数样例
+jobFlowDefineJson 参数样例
```json
{
"Name": "SparkPi",
@@ -56,3 +67,33 @@ Amazon EMR任务类型,用于在AWS上创建EMR集群并执行计算任务。
}
```
+### 向运行中的EMR集群添加Step
+该样例展示了如何创建`ADD_JOB_FLOW_STEPS`类型`EMR`任务节点,以执行`SparkPi`为例,该任务会向运行中的`EMR`集群添加一个`SparkPi`示例程序。
+
+
+
+stepsDefineJson 参数样例
+```json
+{
+ "JobFlowId": "j-3V628TKAERHP8",
+ "Steps": [
+ {
+ "Name": "calculate_pi",
+ "ActionOnFailure": "CONTINUE",
+ "HadoopJarStep": {
+ "Jar": "command-runner.jar",
+ "Args": [
+ "/usr/lib/spark/bin/run-example",
+ "SparkPi",
+ "15"
+ ]
+ }
+ }
+ ]
+}
+```
+
+## 注意事项:
+
+- EMR 任务类型的故障转移尚未实现。目前,DolphinScheduler 仅支持对 yarn task type 进行故障转移。其他任务类型,如
EMR 任务、k8s 任务尚未准备好。
+- `stepsDefineJson` 一个任务定义仅支持关联单个step,这样可以更好的保证任务状态的可靠性。
\ No newline at end of file
diff --git a/docs/img/tasks/demo/emr_add_job_flow_steps.png
b/docs/img/tasks/demo/emr_add_job_flow_steps.png
new file mode 100644
index 0000000000..bc8de30f72
Binary files /dev/null and b/docs/img/tasks/demo/emr_add_job_flow_steps.png
differ
diff --git a/docs/img/tasks/demo/emr_jobFlowId.png
b/docs/img/tasks/demo/emr_jobFlowId.png
new file mode 100644
index 0000000000..b79097afd5
Binary files /dev/null and b/docs/img/tasks/demo/emr_jobFlowId.png differ
diff --git a/docs/img/tasks/demo/emr_run_job_flow.png
b/docs/img/tasks/demo/emr_run_job_flow.png
new file mode 100644
index 0000000000..ad09c5f7c9
Binary files /dev/null and b/docs/img/tasks/demo/emr_run_job_flow.png differ
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/AbstractEmrTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/AbstractEmrTask.java
new file mode 100644
index 0000000000..329c3bc3d5
--- /dev/null
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/AbstractEmrTask.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.emr;
+
+import static
com.fasterxml.jackson.databind.DeserializationFeature.ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT;
+import static
com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES;
+import static
com.fasterxml.jackson.databind.DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL;
+import static
com.fasterxml.jackson.databind.MapperFeature.REQUIRE_SETTERS_FOR_GETTERS;
+
+import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import
org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+import org.apache.dolphinscheduler.spi.utils.PropertyUtils;
+
+import java.util.TimeZone;
+
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce;
+import
com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClientBuilder;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.PropertyNamingStrategy;
+
+/**
+ * ERM Task abstract base class
+ *
+ * @since v3.1.0
+ */
+public abstract class AbstractEmrTask extends AbstractTaskExecutor {
+
+ final TaskExecutionContext taskExecutionContext;
+ EmrParameters emrParameters;
+ AmazonElasticMapReduce emrClient;
+ String clusterId;
+
+
+ /**
+ * config ObjectMapper features and propertyNamingStrategy
+ * use UpperCamelCaseStrategy support capital letters parse
+ *
+ * @see PropertyNamingStrategy.UpperCamelCaseStrategy
+ */
+ static final ObjectMapper objectMapper = new ObjectMapper()
+ .configure(FAIL_ON_UNKNOWN_PROPERTIES, false)
+ .configure(ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT, true)
+ .configure(READ_UNKNOWN_ENUM_VALUES_AS_NULL, true)
+ .configure(REQUIRE_SETTERS_FOR_GETTERS, true)
+ .setTimeZone(TimeZone.getDefault())
+ .setPropertyNamingStrategy(new
PropertyNamingStrategy.UpperCamelCaseStrategy());
+
+ /**
+ * constructor
+ *
+ * @param taskExecutionContext taskExecutionContext
+ */
+ protected AbstractEmrTask(TaskExecutionContext taskExecutionContext) {
+ super(taskExecutionContext);
+ this.taskExecutionContext = taskExecutionContext;
+ }
+
+ @Override
+ public void init() {
+ final String taskParams = taskExecutionContext.getTaskParams();
+ logger.info("emr task params:{}", taskParams);
+ emrParameters = JSONUtils.parseObject(taskParams, EmrParameters.class);
+ if (emrParameters == null || !emrParameters.checkParameters()) {
+ throw new EmrTaskException("emr task params is not valid");
+ }
+ emrClient = createEmrClient();
+ }
+
+ @Override
+ public AbstractParameters getParameters() {
+ return emrParameters;
+ }
+
+ /**
+ * create emr client from BasicAWSCredentials
+ *
+ * @return AmazonElasticMapReduce
+ */
+ private AmazonElasticMapReduce createEmrClient() {
+
+ final String awsAccessKeyId =
PropertyUtils.getString(TaskConstants.AWS_ACCESS_KEY_ID);
+ final String awsSecretAccessKey =
PropertyUtils.getString(TaskConstants.AWS_SECRET_ACCESS_KEY);
+ final String awsRegion =
PropertyUtils.getString(TaskConstants.AWS_REGION);
+ final BasicAWSCredentials basicAWSCredentials = new
BasicAWSCredentials(awsAccessKeyId, awsSecretAccessKey);
+ final AWSCredentialsProvider awsCredentialsProvider = new
AWSStaticCredentialsProvider(basicAWSCredentials);
+ // create an EMR client
+ return AmazonElasticMapReduceClientBuilder.standard()
+ .withCredentials(awsCredentialsProvider)
+ .withRegion(awsRegion)
+ .build();
+ }
+}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTask.java
new file mode 100644
index 0000000000..d747577b71
--- /dev/null
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTask.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.emr;
+
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+
+import java.util.HashSet;
+import java.util.concurrent.TimeUnit;
+
+import com.amazonaws.SdkBaseException;
+import com.amazonaws.services.elasticmapreduce.model.AddJobFlowStepsRequest;
+import com.amazonaws.services.elasticmapreduce.model.AddJobFlowStepsResult;
+import com.amazonaws.services.elasticmapreduce.model.CancelStepsInfo;
+import com.amazonaws.services.elasticmapreduce.model.CancelStepsRequest;
+import com.amazonaws.services.elasticmapreduce.model.CancelStepsRequestStatus;
+import com.amazonaws.services.elasticmapreduce.model.CancelStepsResult;
+import com.amazonaws.services.elasticmapreduce.model.DescribeStepRequest;
+import com.amazonaws.services.elasticmapreduce.model.DescribeStepResult;
+import com.amazonaws.services.elasticmapreduce.model.StepState;
+import com.amazonaws.services.elasticmapreduce.model.StepStatus;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.collect.Sets;
+
+/**
+ * AddJobFlowSteps task executor
+ *
+ * @since v3.1.0
+ */
+public class EmrAddStepsTask extends AbstractEmrTask {
+
+ private String stepId;
+
+ private final HashSet<String> waitingStateSet = Sets.newHashSet(
+ StepState.PENDING.toString(),
+ StepState.CANCEL_PENDING.toString(),
+ StepState.RUNNING.toString()
+ );
+
+ /**
+ * constructor
+ *
+ * @param taskExecutionContext taskExecutionContext
+ */
+ protected EmrAddStepsTask(TaskExecutionContext taskExecutionContext) {
+ super(taskExecutionContext);
+ }
+
+ @Override
+ public void handle() throws InterruptedException {
+ StepStatus stepStatus = null;
+ try {
+ AddJobFlowStepsRequest addJobFlowStepsRequest =
createAddJobFlowStepsRequest();
+
+ // submit addJobFlowStepsRequest to aws
+ AddJobFlowStepsResult result =
emrClient.addJobFlowSteps(addJobFlowStepsRequest);
+
+ clusterId = addJobFlowStepsRequest.getJobFlowId();
+ stepId = result.getStepIds().get(0);
+ // use clusterId-stepId as appIds
+ setAppIds(clusterId + TaskConstants.SUBTRACT_STRING + stepId);
+
+ stepStatus = getStepStatus();
+
+ while (waitingStateSet.contains(stepStatus.getState())) {
+ TimeUnit.SECONDS.sleep(10);
+ stepStatus = getStepStatus();
+ }
+
+ } catch (EmrTaskException | SdkBaseException e) {
+ logger.error("emr task submit failed with error", e);
+ } finally {
+ final int exitStatusCode = calculateExitStatusCode(stepStatus);
+ setExitStatusCode(exitStatusCode);
+ logger.info("emr task finished with step status : {}", stepStatus);
+ }
+ }
+
+ /**
+ * parse json string to AddJobFlowStepsRequest
+ *
+ * @return AddJobFlowStepsRequest
+ */
+ private AddJobFlowStepsRequest createAddJobFlowStepsRequest() {
+
+ final AddJobFlowStepsRequest addJobFlowStepsRequest;
+ try {
+ addJobFlowStepsRequest =
objectMapper.readValue(emrParameters.getStepsDefineJson(),
AddJobFlowStepsRequest.class);
+ } catch (JsonProcessingException e) {
+ throw new EmrTaskException("can not parse AddJobFlowStepsRequest
from json", e);
+ }
+
+ // When a single task definition is associated with multiple steps,
the state tracking will have high complexity.
+ // Therefore, A task definition only supports the association of a
single step, which can better ensure the reliability of the task state.
+ if (addJobFlowStepsRequest.getSteps().size() > 1) {
+ throw new EmrTaskException("ds emr addJobFlowStepsTask only
support one step");
+ }
+
+ return addJobFlowStepsRequest;
+ }
+
+ /**
+ * calculate task exitStatusCode
+ *
+ * @param stepStatus aws emr execution status details of the cluster step.
+ * @return exitStatusCode
+ */
+ private int calculateExitStatusCode(StepStatus stepStatus) {
+ if (stepStatus == null) {
+ return TaskConstants.EXIT_CODE_FAILURE;
+ } else {
+ String state = stepStatus.getState();
+ StepState stepState = StepState.valueOf(state);
+ switch (stepState) {
+ case COMPLETED:
+ return TaskConstants.EXIT_CODE_SUCCESS;
+ case CANCELLED:
+ return TaskConstants.EXIT_CODE_KILL;
+ default:
+ return TaskConstants.EXIT_CODE_FAILURE;
+ }
+ }
+
+ }
+
+ private StepStatus getStepStatus() {
+ DescribeStepRequest describeStepRequest = new
DescribeStepRequest().withClusterId(clusterId).withStepId(stepId);
+ DescribeStepResult result =
emrClient.describeStep(describeStepRequest);
+ if (result == null) {
+ throw new EmrTaskException("fetch step status failed");
+ }
+ StepStatus stepStatus = result.getStep().getStatus();
+ logger.info("emr step [clusterId:{}, stepId:{}] running with
status:{}", clusterId, stepId, stepStatus);
+ return stepStatus;
+
+ }
+
+ @Override
+ public void cancelApplication(boolean status) throws Exception {
+ super.cancelApplication(status);
+ logger.info("trying cancel emr step, taskId:{}, clusterId:{},
stepId:{}", this.taskExecutionContext.getTaskInstanceId(), clusterId, stepId);
+ CancelStepsRequest cancelStepsRequest = new
CancelStepsRequest().withClusterId(clusterId).withStepIds(stepId);
+ CancelStepsResult cancelStepsResult =
emrClient.cancelSteps(cancelStepsRequest);
+
+ if (cancelStepsResult == null) {
+ throw new EmrTaskException("cancel emr step failed");
+ }
+
+ CancelStepsInfo cancelEmrStepInfo =
cancelStepsResult.getCancelStepsInfoList()
+ .stream()
+ .filter(cancelStepsInfo ->
cancelStepsInfo.getStepId().equals(stepId))
+ .findFirst()
+ .orElseThrow(() -> new EmrTaskException("cancel emr step failed"));
+
+ if
(CancelStepsRequestStatus.FAILED.toString().equals(cancelEmrStepInfo.getStatus()))
{
+ throw new EmrTaskException("cancel emr step failed, message:" +
cancelEmrStepInfo.getReason());
+ }
+
+ logger.info("the result of cancel emr step is:{}", cancelStepsResult);
+ }
+
+}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTask.java
similarity index 61%
rename from
dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrTask.java
rename to
dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTask.java
index ada0041bf9..ed42de2831 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTask.java
@@ -17,28 +17,13 @@
package org.apache.dolphinscheduler.plugin.task.emr;
-import static
com.fasterxml.jackson.databind.DeserializationFeature.ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT;
-import static
com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES;
-import static
com.fasterxml.jackson.databind.DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL;
-import static
com.fasterxml.jackson.databind.MapperFeature.REQUIRE_SETTERS_FOR_GETTERS;
-
-import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import
org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
-import org.apache.dolphinscheduler.spi.utils.JSONUtils;
-import org.apache.dolphinscheduler.spi.utils.PropertyUtils;
import java.util.HashSet;
-import java.util.TimeZone;
import java.util.concurrent.TimeUnit;
import com.amazonaws.SdkBaseException;
-import com.amazonaws.auth.AWSCredentialsProvider;
-import com.amazonaws.auth.AWSStaticCredentialsProvider;
-import com.amazonaws.auth.BasicAWSCredentials;
-import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce;
-import
com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClientBuilder;
import com.amazonaws.services.elasticmapreduce.model.ClusterState;
import com.amazonaws.services.elasticmapreduce.model.ClusterStateChangeReason;
import
com.amazonaws.services.elasticmapreduce.model.ClusterStateChangeReasonCode;
@@ -50,23 +35,9 @@ import
com.amazonaws.services.elasticmapreduce.model.RunJobFlowResult;
import com.amazonaws.services.elasticmapreduce.model.TerminateJobFlowsRequest;
import com.amazonaws.services.elasticmapreduce.model.TerminateJobFlowsResult;
import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.PropertyNamingStrategy;
import com.google.common.collect.Sets;
-public class EmrTask extends AbstractTaskExecutor {
-
- /**
- * taskExecutionContext
- */
- private final TaskExecutionContext taskExecutionContext;
- /**
- * emr parameters
- */
- private EmrParameters emrParameters;
- private AmazonElasticMapReduce emrClient;
-
- private String clusterId;
+public class EmrJobFlowTask extends AbstractEmrTask {
private final HashSet<String> waitingStateSet = Sets.newHashSet(
ClusterState.STARTING.toString(),
@@ -74,40 +45,13 @@ public class EmrTask extends AbstractTaskExecutor {
ClusterState.RUNNING.toString()
);
- /**
- * config ObjectMapper features and propertyNamingStrategy
- * use UpperCamelCaseStrategy support capital letters parse
- * @see PropertyNamingStrategy.UpperCamelCaseStrategy
- */
- private static final ObjectMapper objectMapper = new ObjectMapper()
- .configure(FAIL_ON_UNKNOWN_PROPERTIES, false)
- .configure(ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT, true)
- .configure(READ_UNKNOWN_ENUM_VALUES_AS_NULL, true)
- .configure(REQUIRE_SETTERS_FOR_GETTERS, true)
- .setTimeZone(TimeZone.getDefault())
- .setPropertyNamingStrategy(new
PropertyNamingStrategy.UpperCamelCaseStrategy());
-
/**
* constructor
*
* @param taskExecutionContext taskExecutionContext
*/
- protected EmrTask(TaskExecutionContext taskExecutionContext) {
-
+ protected EmrJobFlowTask(TaskExecutionContext taskExecutionContext) {
super(taskExecutionContext);
- this.taskExecutionContext = taskExecutionContext;
- }
-
- @Override
- public void init() {
-
- final String taskParams = taskExecutionContext.getTaskParams();
- logger.info("emr task params:{}", taskParams);
- emrParameters = JSONUtils.parseObject(taskParams, EmrParameters.class);
- if (emrParameters == null || !emrParameters.checkParameters()) {
- throw new EmrTaskException("emr task params is not valid");
- }
- emrClient = createEmrClient();
}
@Override
@@ -120,7 +64,7 @@ public class EmrTask extends AbstractTaskExecutor {
RunJobFlowResult result = emrClient.runJobFlow(runJobFlowRequest);
clusterId = result.getJobFlowId();
- // TODO: Failover on EMR Task type has not been implemented. In
this time, DS only supports failover on yarn task type . Other task type, such
as EMR task, k8s task not ready yet.
+ // Failover on EMR Task type has not been implemented. In this
time, DS only supports failover on yarn task type . Other task type, such as
EMR task, k8s task not ready yet.
setAppIds(clusterId);
clusterStatus = getClusterStatus();
@@ -199,30 +143,6 @@ public class EmrTask extends AbstractTaskExecutor {
}
- @Override
- public AbstractParameters getParameters() {
- return emrParameters;
- }
-
- /**
- * create emr client from BasicAWSCredentials
- *
- * @return AmazonElasticMapReduce
- */
- private AmazonElasticMapReduce createEmrClient() {
-
- final String awsAccessKeyId =
PropertyUtils.getString(TaskConstants.AWS_ACCESS_KEY_ID);
- final String awsSecretAccessKey =
PropertyUtils.getString(TaskConstants.AWS_SECRET_ACCESS_KEY);
- final String awsRegion =
PropertyUtils.getString(TaskConstants.AWS_REGION);
- final BasicAWSCredentials basicAWSCredentials = new
BasicAWSCredentials(awsAccessKeyId, awsSecretAccessKey);
- final AWSCredentialsProvider awsCredentialsProvider = new
AWSStaticCredentialsProvider(basicAWSCredentials);
- // create an EMR client
- return AmazonElasticMapReduceClientBuilder.standard()
- .withCredentials(awsCredentialsProvider)
- .withRegion(awsRegion)
- .build();
- }
-
@Override
public void cancelApplication(boolean status) throws Exception {
super.cancelApplication(status);
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrParameters.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrParameters.java
index 14396fcebc..1e38a66d81 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrParameters.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrParameters.java
@@ -24,18 +24,41 @@ import org.apache.dolphinscheduler.spi.utils.StringUtils;
import java.util.Collections;
import java.util.List;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+@Data
+@EqualsAndHashCode(callSuper = true)
public class EmrParameters extends AbstractParameters {
+ /**
+ * emr program type
+ * 0 RUN_JOB_FLOW, 1 ADD_JOB_FLOW_STEPS
+ */
+ private ProgramType programType;
+
/**
* job flow define in json format
+ *
* @see <a
href="https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html#API_RunJobFlow_Examples">API_RunJobFlow_Examples</a>
*/
private String jobFlowDefineJson;
+ /**
+ * steps define in json format
+ *
+ * @see <a
href="https://docs.aws.amazon.com/emr/latest/APIReference/API_AddJobFlowSteps.html#API_AddJobFlowSteps_Examples">API_AddJobFlowSteps_Examples</a>
+ */
+ private String stepsDefineJson;
+
@Override
public boolean checkParameters() {
-
- return StringUtils.isNotEmpty(jobFlowDefineJson);
+ /*
+ * When saving a task, the programType cannot be empty and
jobFlowDefineJson or stepsDefineJson cannot be empty:
+ * (1) When ProgramType is RUN_JOB_FLOW, jobFlowDefineJson cannot be
empty.
+ * (2) When ProgramType is ADD_JOB_FLOW_STEPS, stepsDefineJson cannot
be empty.
+ */
+ return programType != null &&
(StringUtils.isNotEmpty(jobFlowDefineJson) ||
StringUtils.isNotEmpty(stepsDefineJson));
}
@Override
@@ -44,18 +67,12 @@ public class EmrParameters extends AbstractParameters {
}
- public String getJobFlowDefineJson() {
- return jobFlowDefineJson;
- }
-
- public void setJobFlowDefineJson(String jobFlowDefineJson) {
- this.jobFlowDefineJson = jobFlowDefineJson;
- }
-
@Override
public String toString() {
return "EmrParameters{"
- + "jobFlowDefineJson='" + jobFlowDefineJson + '\''
+ + "programType=" + programType
+ + ", jobFlowDefineJson='" + jobFlowDefineJson + '\''
+ + ", stepsDefineJson='" + stepsDefineJson + '\''
+ '}';
}
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrTaskChannel.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrTaskChannel.java
index 898362f2cc..e59b4e3613 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrTaskChannel.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrTaskChannel.java
@@ -33,7 +33,15 @@ public class EmrTaskChannel implements TaskChannel {
@Override
public AbstractTask createTask(TaskExecutionContext taskRequest) {
- return new EmrTask(taskRequest);
+ EmrParameters emrParameters =
JSONUtils.parseObject(taskRequest.getTaskParams(), EmrParameters.class);
+ assert emrParameters != null;
+ if (ProgramType.RUN_JOB_FLOW.equals(emrParameters.getProgramType())) {
+ return new EmrJobFlowTask(taskRequest);
+ } else if
(ProgramType.ADD_JOB_FLOW_STEPS.equals(emrParameters.getProgramType())) {
+ return new EmrAddStepsTask(taskRequest);
+ } else {
+ throw new IllegalArgumentException("Unsupported program type: " +
emrParameters.getProgramType());
+ }
}
@Override
diff --git
a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-emr.ts
b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/ProgramType.java
similarity index 55%
copy from
dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-emr.ts
copy to
dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/ProgramType.java
index a67b370d18..0f0870c119 100644
---
a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-emr.ts
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/ProgramType.java
@@ -14,31 +14,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-import { useI18n } from 'vue-i18n'
-import { useCustomParams } from '.'
-import type { IJsonItem } from '../types'
-export function useEmr(model: { [field: string]: any }): IJsonItem[] {
- const { t } = useI18n()
+package org.apache.dolphinscheduler.plugin.task.emr;
- return [
- {
- type: 'editor',
- field: 'jobFlowDefineJson',
- name: t('project.node.emr_flow_define_json'),
- props: {
- language: 'json'
- },
- validate: {
- trigger: ['input', 'trigger'],
- required: true,
- message: t('project.node.emr_flow_define_json_tips')
- }
- },
- ...useCustomParams({
- model,
- field: 'localParams',
- isSimple: true
- })
- ]
+/**
+ * emr program type
+ *
+ * @since v3.1.0
+ */
+public enum ProgramType {
+ /**
+ * RunJobFlow
+ */
+ RUN_JOB_FLOW,
+ /**
+ * AddJobFlowSteps
+ */
+ ADD_JOB_FLOW_STEPS
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTaskTest.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTaskTest.java
new file mode 100644
index 0000000000..d74d36fa82
--- /dev/null
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTaskTest.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.emr;
+
+import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
+import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_KILL;
+import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_SUCCESS;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.powermock.api.mockito.PowerMockito.doReturn;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.mockStatic;
+import static org.powermock.api.mockito.PowerMockito.spy;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+
+import org.apache.commons.io.IOUtils;
+
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce;
+import
com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClientBuilder;
+import com.amazonaws.services.elasticmapreduce.model.AddJobFlowStepsResult;
+import
com.amazonaws.services.elasticmapreduce.model.AmazonElasticMapReduceException;
+import com.amazonaws.services.elasticmapreduce.model.DescribeStepResult;
+import com.amazonaws.services.elasticmapreduce.model.Step;
+import com.amazonaws.services.elasticmapreduce.model.StepState;
+import com.amazonaws.services.elasticmapreduce.model.StepStatus;
+
+/**
+ * EmrAddStepsTask Test
+ *
+ * @since v3.1.0
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({
+ AmazonElasticMapReduceClientBuilder.class,
+ EmrAddStepsTask.class,
+ AmazonElasticMapReduce.class,
+ JSONUtils.class
+})
+@PowerMockIgnore({"javax.*"})
+public class EmrAddStepsTaskTest {
+
+ private final StepStatus pendingState =
+ new StepStatus().withState(StepState.PENDING);
+
+ private final StepStatus runningState =
+ new StepStatus().withState(StepState.RUNNING);
+
+ private final StepStatus completedState =
+ new StepStatus().withState(StepState.COMPLETED);
+
+ private final StepStatus cancelledState =
+ new StepStatus().withState(StepState.CANCELLED);
+
+ private final StepStatus failedState =
+ new StepStatus().withState(StepState.FAILED);
+
+ private EmrAddStepsTask emrAddStepsTask;
+ private AmazonElasticMapReduce emrClient;
+ private Step step;
+
+ @Before
+ public void before() throws Exception {
+ // mock EmrParameters and EmrAddStepsTask
+ EmrParameters emrParameters = buildEmrTaskParameters();
+ String emrParametersString = JSONUtils.toJsonString(emrParameters);
+ TaskExecutionContext taskExecutionContext =
PowerMockito.mock(TaskExecutionContext.class);
+
when(taskExecutionContext.getTaskParams()).thenReturn(emrParametersString);
+ emrAddStepsTask = spy(new EmrAddStepsTask(taskExecutionContext));
+
+ // mock emrClient and behavior
+ emrClient = mock(AmazonElasticMapReduce.class);
+
+ AddJobFlowStepsResult addJobFlowStepsResult =
mock(AddJobFlowStepsResult.class);
+
when(emrClient.addJobFlowSteps(any())).thenReturn(addJobFlowStepsResult);
+
when(addJobFlowStepsResult.getStepIds()).thenReturn(Collections.singletonList("step-xx"));
+
+ doReturn(emrClient).when(emrAddStepsTask, "createEmrClient");
+ DescribeStepResult describeStepResult = mock(DescribeStepResult.class);
+ when(emrClient.describeStep(any())).thenReturn(describeStepResult);
+
+ // mock step
+ step = mock(Step.class);
+ when(describeStepResult.getStep()).thenReturn(step);
+
+ emrAddStepsTask.init();
+ }
+
+ @Test
+ public void testCanNotParseJson() throws Exception {
+ mockStatic(JSONUtils.class);
+ when(emrAddStepsTask, "createAddJobFlowStepsRequest").thenThrow(new
EmrTaskException("can not parse AddJobFlowStepsRequest from json", new
Exception("error")));
+ emrAddStepsTask.handle();
+ Assert.assertEquals(EXIT_CODE_FAILURE,
emrAddStepsTask.getExitStatusCode());
+ }
+
+ @Test
+ public void testDefineJsonStepNotOne() throws Exception {
+ // mock EmrParameters and EmrAddStepsTask
+ EmrParameters emrParameters = buildErrorEmrTaskParameters();
+ String emrParametersString = JSONUtils.toJsonString(emrParameters);
+ TaskExecutionContext taskExecutionContext =
PowerMockito.mock(TaskExecutionContext.class);
+
when(taskExecutionContext.getTaskParams()).thenReturn(emrParametersString);
+ emrAddStepsTask = spy(new EmrAddStepsTask(taskExecutionContext));
+ doReturn(emrClient).when(emrAddStepsTask, "createEmrClient");
+ emrAddStepsTask.init();
+ emrAddStepsTask.handle();
+
+ Assert.assertEquals(EXIT_CODE_FAILURE,
emrAddStepsTask.getExitStatusCode());
+ }
+
+ @Test
+ public void testHandle() throws Exception {
+ when(step.getStatus()).thenReturn(pendingState, runningState,
completedState);
+
+ emrAddStepsTask.handle();
+ Assert.assertEquals(EXIT_CODE_SUCCESS,
emrAddStepsTask.getExitStatusCode());
+ }
+
+ @Test
+ public void testHandleUserRequestTerminate() throws Exception {
+ when(step.getStatus()).thenReturn(pendingState, runningState,
cancelledState);
+
+ emrAddStepsTask.handle();
+ Assert.assertEquals(EXIT_CODE_KILL,
emrAddStepsTask.getExitStatusCode());
+ }
+
+ @Test
+ public void testHandleError() throws Exception {
+ when(step.getStatus()).thenReturn(pendingState, runningState,
failedState);
+ emrAddStepsTask.handle();
+ Assert.assertEquals(EXIT_CODE_FAILURE,
emrAddStepsTask.getExitStatusCode());
+
+ when(emrClient.addJobFlowSteps(any())).thenThrow(new
AmazonElasticMapReduceException("error"), new EmrTaskException());
+ emrAddStepsTask.handle();
+ Assert.assertEquals(EXIT_CODE_FAILURE,
emrAddStepsTask.getExitStatusCode());
+ }
+
+ private EmrParameters buildEmrTaskParameters() {
+ EmrParameters emrParameters = new EmrParameters();
+ String stepsDefineJson;
+ try (InputStream i =
this.getClass().getResourceAsStream("EmrAddStepsDefine.json")) {
+ assert i != null;
+ stepsDefineJson = IOUtils.toString(i, StandardCharsets.UTF_8);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ emrParameters.setProgramType(ProgramType.ADD_JOB_FLOW_STEPS);
+ emrParameters.setStepsDefineJson(stepsDefineJson);
+
+ return emrParameters;
+ }
+
+ private EmrParameters buildErrorEmrTaskParameters() {
+ EmrParameters emrParameters = new EmrParameters();
+ String stepsDefineJson;
+ try (InputStream i =
this.getClass().getResourceAsStream("EmrErrorAddStepsDefine.json")) {
+ assert i != null;
+ stepsDefineJson = IOUtils.toString(i, StandardCharsets.UTF_8);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ emrParameters.setProgramType(ProgramType.ADD_JOB_FLOW_STEPS);
+ emrParameters.setStepsDefineJson(stepsDefineJson);
+
+ return emrParameters;
+ }
+}
\ No newline at end of file
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrTaskTest.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTaskTest.java
similarity index 84%
rename from
dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrTaskTest.java
rename to
dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTaskTest.java
index 285078f21f..65c6c0c239 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrTaskTest.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTaskTest.java
@@ -59,12 +59,12 @@ import
com.amazonaws.services.elasticmapreduce.model.RunJobFlowResult;
@RunWith(PowerMockRunner.class)
@PrepareForTest({
AmazonElasticMapReduceClientBuilder.class,
- EmrTask.class,
+ EmrJobFlowTask.class,
AmazonElasticMapReduce.class,
JSONUtils.class
})
@PowerMockIgnore({"javax.*"})
-public class EmrTaskTest {
+public class EmrJobFlowTaskTest {
private final ClusterStatus startingStatus =
new ClusterStatus().withState(ClusterState.STARTING)
@@ -114,7 +114,7 @@ public class EmrTaskTest {
.withCode(ClusterStateChangeReasonCode.STEP_FAILURE)
);
- private EmrTask emrTask;
+ private EmrJobFlowTask emrJobFlowTask;
private AmazonElasticMapReduce emrClient;
private Cluster cluster;
@@ -123,14 +123,14 @@ public class EmrTaskTest {
String emrParameters = buildEmrTaskParameters();
TaskExecutionContext taskExecutionContext =
PowerMockito.mock(TaskExecutionContext.class);
when(taskExecutionContext.getTaskParams()).thenReturn(emrParameters);
- emrTask = spy(new EmrTask(taskExecutionContext));
+ emrJobFlowTask = spy(new EmrJobFlowTask(taskExecutionContext));
// mock emrClient and behavior
emrClient = mock(AmazonElasticMapReduce.class);
RunJobFlowResult runJobFlowResult = mock(RunJobFlowResult.class);
when(emrClient.runJobFlow(any())).thenReturn(runJobFlowResult);
when(runJobFlowResult.getJobFlowId()).thenReturn("xx");
- doReturn(emrClient).when(emrTask, "createEmrClient");
+ doReturn(emrClient).when(emrJobFlowTask, "createEmrClient");
DescribeClusterResult describeClusterResult =
mock(DescribeClusterResult.class);
when(emrClient.describeCluster(any())).thenReturn(describeClusterResult);
@@ -138,7 +138,7 @@ public class EmrTaskTest {
cluster = mock(Cluster.class);
when(describeClusterResult.getCluster()).thenReturn(cluster);
- emrTask.init();
+ emrJobFlowTask.init();
}
@Test
@@ -146,8 +146,8 @@ public class EmrTaskTest {
when(cluster.getStatus()).thenReturn(startingStatus,
softwareConfigStatus, runningStatus, terminatingStatus);
- emrTask.handle();
- Assert.assertEquals(EXIT_CODE_SUCCESS, emrTask.getExitStatusCode());
+ emrJobFlowTask.handle();
+ Assert.assertEquals(EXIT_CODE_SUCCESS,
emrJobFlowTask.getExitStatusCode());
}
@@ -155,32 +155,32 @@ public class EmrTaskTest {
public void testHandleAliveWhenNoSteps() throws Exception {
when(cluster.getStatus()).thenReturn(startingStatus,
softwareConfigStatus, runningStatus, waitingStatus);
- emrTask.handle();
- Assert.assertEquals(EXIT_CODE_SUCCESS, emrTask.getExitStatusCode());
+ emrJobFlowTask.handle();
+ Assert.assertEquals(EXIT_CODE_SUCCESS,
emrJobFlowTask.getExitStatusCode());
}
@Test
public void testHandleUserRequestTerminate() throws Exception {
when(cluster.getStatus()).thenReturn(startingStatus,
userRequestTerminateStatus);
- emrTask.handle();
- Assert.assertEquals(EXIT_CODE_KILL, emrTask.getExitStatusCode());
+ emrJobFlowTask.handle();
+ Assert.assertEquals(EXIT_CODE_KILL,
emrJobFlowTask.getExitStatusCode());
}
@Test
public void testHandleTerminatedWithError() throws Exception {
when(cluster.getStatus()).thenReturn(startingStatus,
softwareConfigStatus, runningStatus, terminatedWithErrorsStatus);
- emrTask.handle();
- Assert.assertEquals(EXIT_CODE_FAILURE, emrTask.getExitStatusCode());
+ emrJobFlowTask.handle();
+ Assert.assertEquals(EXIT_CODE_FAILURE,
emrJobFlowTask.getExitStatusCode());
}
@Test
public void testCanNotParseJson() throws Exception {
mockStatic(JSONUtils.class);
- when(emrTask, "createRunJobFlowRequest").thenThrow(new
EmrTaskException("can not parse RunJobFlowRequest from json", new
Exception("error")));
- emrTask.handle();
- Assert.assertEquals(EXIT_CODE_FAILURE, emrTask.getExitStatusCode());
+ when(emrJobFlowTask, "createRunJobFlowRequest").thenThrow(new
EmrTaskException("can not parse RunJobFlowRequest from json", new
Exception("error")));
+ emrJobFlowTask.handle();
+ Assert.assertEquals(EXIT_CODE_FAILURE,
emrJobFlowTask.getExitStatusCode());
}
@Test
@@ -188,18 +188,18 @@ public class EmrTaskTest {
when(emrClient.describeCluster(any())).thenReturn(null);
- emrTask.handle();
- Assert.assertEquals(EXIT_CODE_FAILURE, emrTask.getExitStatusCode());
+ emrJobFlowTask.handle();
+ Assert.assertEquals(EXIT_CODE_FAILURE,
emrJobFlowTask.getExitStatusCode());
}
@Test
public void testRunJobFlowError() throws Exception {
when(emrClient.runJobFlow(any())).thenThrow(new
AmazonElasticMapReduceException("error"), new EmrTaskException());
- emrTask.handle();
- Assert.assertEquals(EXIT_CODE_FAILURE, emrTask.getExitStatusCode());
- emrTask.handle();
- Assert.assertEquals(EXIT_CODE_FAILURE, emrTask.getExitStatusCode());
+ emrJobFlowTask.handle();
+ Assert.assertEquals(EXIT_CODE_FAILURE,
emrJobFlowTask.getExitStatusCode());
+ emrJobFlowTask.handle();
+ Assert.assertEquals(EXIT_CODE_FAILURE,
emrJobFlowTask.getExitStatusCode());
}
@@ -212,6 +212,7 @@ public class EmrTaskTest {
} catch (Exception e) {
throw new RuntimeException(e);
}
+ emrParameters.setProgramType(ProgramType.RUN_JOB_FLOW);
emrParameters.setJobFlowDefineJson(jobFlowDefineJson);
return JSONUtils.toJsonString(emrParameters);
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/resources/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsDefine.json
b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/resources/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsDefine.json
new file mode 100644
index 0000000000..a14a259a88
--- /dev/null
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/resources/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsDefine.json
@@ -0,0 +1,17 @@
+{
+ "JobFlowId": "j-3V628TKAERHP8",
+ "Steps": [
+ {
+ "Name": "calculate_pi",
+ "ActionOnFailure": "CONTINUE",
+ "HadoopJarStep": {
+ "Jar": "command-runner.jar",
+ "Args": [
+ "/usr/lib/spark/bin/run-example",
+ "SparkPi",
+ "15"
+ ]
+ }
+ }
+ ]
+}
\ No newline at end of file
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/resources/org/apache/dolphinscheduler/plugin/task/emr/EmrErrorAddStepsDefine.json
b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/resources/org/apache/dolphinscheduler/plugin/task/emr/EmrErrorAddStepsDefine.json
new file mode 100644
index 0000000000..46e81b13ba
--- /dev/null
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/resources/org/apache/dolphinscheduler/plugin/task/emr/EmrErrorAddStepsDefine.json
@@ -0,0 +1,29 @@
+{
+ "JobFlowId": "j-3V628TKAERHP8",
+ "Steps": [
+ {
+ "Name": "calculate_pi",
+ "ActionOnFailure": "CONTINUE",
+ "HadoopJarStep": {
+ "Jar": "command-runner.jar",
+ "Args": [
+ "/usr/lib/spark/bin/run-example",
+ "SparkPi",
+ "15"
+ ]
+ }
+ },
+ {
+ "Name": "calculate_pi",
+ "ActionOnFailure": "CONTINUE",
+ "HadoopJarStep": {
+ "Jar": "command-runner.jar",
+ "Args": [
+ "/usr/lib/spark/bin/run-example",
+ "SparkPi",
+ "15"
+ ]
+ }
+ }
+ ]
+}
\ No newline at end of file
diff --git a/dolphinscheduler-ui/src/locales/en_US/project.ts
b/dolphinscheduler-ui/src/locales/en_US/project.ts
index cf2cef1271..43cba0d3d6 100644
--- a/dolphinscheduler-ui/src/locales/en_US/project.ts
+++ b/dolphinscheduler-ui/src/locales/en_US/project.ts
@@ -607,6 +607,8 @@ export default {
required: 'required',
emr_flow_define_json: 'jobFlowDefineJson',
emr_flow_define_json_tips: 'Please enter the definition of the job flow.',
+ emr_steps_define_json: 'stepsDefineJson',
+ emr_steps_define_json_tips: 'Please enter the definition of the emr step.',
segment_separator: 'Segment Execution Separator',
segment_separator_tips: 'Please enter the segment execution separator',
zeppelin_note_id: 'zeppelinNoteId',
diff --git a/dolphinscheduler-ui/src/locales/zh_CN/project.ts
b/dolphinscheduler-ui/src/locales/zh_CN/project.ts
index ab2c9c71f4..3706771854 100644
--- a/dolphinscheduler-ui/src/locales/zh_CN/project.ts
+++ b/dolphinscheduler-ui/src/locales/zh_CN/project.ts
@@ -600,6 +600,8 @@ export default {
required: '必填',
emr_flow_define_json: 'jobFlowDefineJson',
emr_flow_define_json_tips: '请输入工作流定义',
+ emr_steps_define_json: 'stepsDefineJson',
+ emr_steps_define_json_tips: '请输入EMR步骤定义',
segment_separator: '分段执行符号',
segment_separator_tips: '请输入分段执行符号',
zeppelin_note_id: 'zeppelin_note_id',
diff --git
a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-emr.ts
b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-emr.ts
index a67b370d18..1446fc866a 100644
---
a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-emr.ts
+++
b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-emr.ts
@@ -17,14 +17,30 @@
import { useI18n } from 'vue-i18n'
import { useCustomParams } from '.'
import type { IJsonItem } from '../types'
+import {computed} from "vue";
export function useEmr(model: { [field: string]: any }): IJsonItem[] {
const { t } = useI18n()
+ const jobFlowDefineJsonSpan = computed(() => (model.programType ===
'RUN_JOB_FLOW' ? 24 : 0))
+
+ const stepsDefineJsonSpan = computed(() => (model.programType ===
'ADD_JOB_FLOW_STEPS' ? 24 : 0))
+
return [
+ {
+ type: 'select',
+ field: 'programType',
+ span: 24,
+ name: t('project.node.program_type'),
+ options: PROGRAM_TYPES,
+ validate: {
+ required: true
+ }
+ },
{
type: 'editor',
field: 'jobFlowDefineJson',
+ span: jobFlowDefineJsonSpan,
name: t('project.node.emr_flow_define_json'),
props: {
language: 'json'
@@ -35,6 +51,20 @@ export function useEmr(model: { [field: string]: any }):
IJsonItem[] {
message: t('project.node.emr_flow_define_json_tips')
}
},
+ {
+ type: 'editor',
+ field: 'stepsDefineJson',
+ span: stepsDefineJsonSpan,
+ name: t('project.node.emr_steps_define_json'),
+ props: {
+ language: 'json'
+ },
+ validate: {
+ trigger: ['input', 'trigger'],
+ required: true,
+ message: t('project.node.emr_steps_define_json_tips')
+ }
+ },
...useCustomParams({
model,
field: 'localParams',
@@ -42,3 +72,14 @@ export function useEmr(model: { [field: string]: any }):
IJsonItem[] {
})
]
}
+
+export const PROGRAM_TYPES = [
+ {
+ label: 'RUN_JOB_FLOW',
+ value: 'RUN_JOB_FLOW'
+ },
+ {
+ label: 'ADD_JOB_FLOW_STEPS',
+ value: 'ADD_JOB_FLOW_STEPS'
+ }
+]
\ No newline at end of file
diff --git
a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
index c90d1660db..4ab18ccd05 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
@@ -321,7 +321,9 @@ export function formatParams(data: INodeData): {
if (data.taskType === 'EMR') {
taskParams.type = data.type
+ taskParams.programType = data.programType
taskParams.jobFlowDefineJson = data.jobFlowDefineJson
+ taskParams.stepsDefineJson = data.stepsDefineJson
}
if (data.taskType === 'ZEPPELIN') {
diff --git
a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-emr.ts
b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-emr.ts
index 1face687fb..36dbbefa79 100644
---
a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-emr.ts
+++
b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-emr.ts
@@ -44,6 +44,7 @@ export function useEmr({
workerGroup: 'default',
delayTime: 0,
timeout: 30,
+ programType: 'ADD_JOB_FLOW_STEPS',
timeoutNotifyStrategy: ['WARN']
} as INodeData)
diff --git
a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
index 5019f29ce1..187ff7532f 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
@@ -295,6 +295,7 @@ interface ITaskParams {
ruleId?: number
ruleInputParameter?: IRuleParameters
jobFlowDefineJson?: string
+ stepsDefineJson?: string
zeppelinNoteId?: string
zeppelinParagraphId?: string
noteId?: string