This is an automated email from the ASF dual-hosted git repository.

zhongjiajie pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 8eaf5a2309 [Feature-10219][EMR] EMR supports use <add-Steps> to add 
steps to an existing cluster (#10657)
8eaf5a2309 is described below

commit 8eaf5a230998946001b80c9f373da844e6efcd21
Author: ZhaoGuodong <[email protected]>
AuthorDate: Sun Jul 10 10:15:25 2022 +0800

    [Feature-10219][EMR] EMR supports use <add-Steps> to add steps to an 
existing cluster (#10657)
    
    * Add the ProgramType parameter to distinguish task types
    * EmrAddStepsTask supports Add-Steps
    * UI supports Add-Steps
    * EmrTask modify the name of the class to EmrJobFlowTask
    * add ERM Task abstract base class AbstractEmrTask
    * add testcase for EmrAddStepsTask
    * Modifying help Documents
---
 docs/docs/en/guide/task/emr.md                     |  48 ++++-
 docs/docs/zh/guide/task/emr.md                     |  47 ++++-
 docs/img/tasks/demo/emr_add_job_flow_steps.png     | Bin 0 -> 131722 bytes
 docs/img/tasks/demo/emr_jobFlowId.png              | Bin 0 -> 133514 bytes
 docs/img/tasks/demo/emr_run_job_flow.png           | Bin 0 -> 139410 bytes
 .../plugin/task/emr/AbstractEmrTask.java           | 113 ++++++++++++
 .../plugin/task/emr/EmrAddStepsTask.java           | 177 ++++++++++++++++++
 .../task/emr/{EmrTask.java => EmrJobFlowTask.java} |  86 +--------
 .../plugin/task/emr/EmrParameters.java             |  39 ++--
 .../plugin/task/emr/EmrTaskChannel.java            |  10 +-
 .../plugin/task/emr/ProgramType.java               |  40 ++---
 .../plugin/task/emr/EmrAddStepsTaskTest.java       | 198 +++++++++++++++++++++
 .../{EmrTaskTest.java => EmrJobFlowTaskTest.java}  |  47 ++---
 .../plugin/task/emr/EmrAddStepsDefine.json         |  17 ++
 .../plugin/task/emr/EmrErrorAddStepsDefine.json    |  29 +++
 dolphinscheduler-ui/src/locales/en_US/project.ts   |   2 +
 dolphinscheduler-ui/src/locales/zh_CN/project.ts   |   2 +
 .../task/components/node/fields/use-emr.ts         |  41 +++++
 .../projects/task/components/node/format-data.ts   |   2 +
 .../projects/task/components/node/tasks/use-emr.ts |   1 +
 .../views/projects/task/components/node/types.ts   |   1 +
 21 files changed, 750 insertions(+), 150 deletions(-)

diff --git a/docs/docs/en/guide/task/emr.md b/docs/docs/en/guide/task/emr.md
index 050d7c2397..ebcaa885aa 100644
--- a/docs/docs/en/guide/task/emr.md
+++ b/docs/docs/en/guide/task/emr.md
@@ -2,7 +2,11 @@
 
 ## Overview
 
-Amazon EMR task type, for creating EMR clusters on AWS and running computing 
tasks. Using [aws-java-sdk](https://aws.amazon.com/cn/sdk-for-java/) in the 
background code, to transfer JSON parameters to  
[RunJobFlowRequest](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/elasticmapreduce/model/RunJobFlowRequest.html)
 object and submit to AWS.
+Amazon EMR task type, for operation EMR clusters on AWS and running computing 
tasks. 
+Using [aws-java-sdk](https://aws.amazon.com/cn/sdk-for-java/) in the 
background code, to transfer JSON parameters to task object and submit to AWS, 
Two program types are currently supported:
+
+* `RUN_JOB_FLOW` Using 
[API_RunJobFlow](https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html#API_RunJobFlow_Examples)
 submit 
[RunJobFlowRequest](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/elasticmapreduce/model/RunJobFlowRequest.html)
 object
+* `ADD_JOB_FLOW_STEPS` Using 
[API_AddJobFlowSteps](https://docs.aws.amazon.com/emr/latest/APIReference/API_AddJobFlowSteps.html#API_AddJobFlowSteps_Examples)
 submit 
[AddJobFlowStepsRequest](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/elasticmapreduce/model/AddJobFlowStepsRequest.html)
 object
 
 ## Create Task
 
@@ -19,12 +23,18 @@ Amazon EMR task type, for creating EMR clusters on AWS and 
running computing tas
 | Task priority | When the number of worker threads is insufficient, execute 
in the order of priority from high to low, and tasks with the same priority 
will execute in a first-in first-out order. |
 | Worker grouping | Assign tasks to the machines of the worker group to 
execute. If `Default` is selected, randomly select a worker machine for 
execution. |
 | Times of failed retry attempts | The number of times the task failed to 
resubmit. You can select from drop-down or fill-in a number. |
-| Failed retry interval: The time interval for resubmitting the task after a 
failed task. You can select from drop-down or fill-in a number. |
+| Failed retry interval | The time interval for resubmitting the task after a 
failed task. You can select from drop-down or fill-in a number. |
 | Timeout alarm | Check the timeout alarm and timeout failure. When the task 
runs exceed the "timeout", an alarm email will send and the task execution will 
fail. |
-| JSON | JSON corresponding to the 
[RunJobFlowRequest](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/elasticmapreduce/model/RunJobFlowRequest.html)
 object, for details refer to 
[API_RunJobFlow_Examples](https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html#API_RunJobFlow_Examples).
 |
+| Program Type | Select the program type. If it is `RUN_JOB_FLOW`, you need to 
fill in `jobFlowDefineJson`, if it is `ADD_JOB_FLOW_STEPS`, you need to fill in 
`stepsDefineJson`. |
+| jobFlowDefineJson | JSON corresponding to the 
[RunJobFlowRequest](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/elasticmapreduce/model/RunJobFlowRequest.html)
 object, for details refer to 
[API_RunJobFlow_Examples](https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html#API_RunJobFlow_Examples).
 |
+| stepsDefineJson | JSON corresponding to the 
[AddJobFlowStepsRequest](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/elasticmapreduce/model/AddJobFlowStepsRequest.html)
 object, for details refer to 
[API_AddJobFlowSteps_Examples](https://docs.aws.amazon.com/emr/latest/APIReference/API_AddJobFlowSteps.html#API_AddJobFlowSteps_Examples).
 |
 
-## JSON example
+## Task Example
+### Create an EMR cluster and run Steps
+This example shows how to create an `EMR` task node of type `RUN_JOB_FLOW`. 
Taking the execution of `SparkPi` as an example, the task will create an `EMR` 
cluster and execute the `SparkPi` sample program.
+![RUN_JOB_FLOW](../../../../img/tasks/demo/emr_run_job_flow.png)
 
+jobFlowDefineJson example
 ```json
 {
   "Name": "SparkPi",
@@ -65,3 +75,33 @@ Amazon EMR task type, for creating EMR clusters on AWS and 
running computing tas
 }
 ```
 
+### Add a Step to a Running EMR Cluster
+This example shows how to create an `EMR` task node of type 
`ADD_JOB_FLOW_STEPS`. Taking the execution of `SparkPi` as an example, the task 
will add a `SparkPi` sample program to the running `EMR` cluster.
+![ADD_JOB_FLOW_STEPS](../../../../img/tasks/demo/emr_add_job_flow_steps.png)
+![JobFlowId](../../../../img/tasks/demo/emr_jobFlowId.png)
+
+stepsDefineJson example
+```json
+{
+  "JobFlowId": "j-3V628TKAERHP8",
+  "Steps": [
+    {
+      "Name": "calculate_pi",
+      "ActionOnFailure": "CONTINUE",
+      "HadoopJarStep": {
+        "Jar": "command-runner.jar",
+        "Args": [
+          "/usr/lib/spark/bin/run-example",
+          "SparkPi",
+          "15"
+        ]
+      }
+    }
+  ]
+}
+```
+
+## Notice
+
+- Failover on EMR Task type has not been implemented. In this time, 
DolphinScheduler only supports failover on yarn task type . Other task type, 
such as EMR task, k8s task not ready yet.
+- `stepsDefineJson` A task definition only supports the association of a 
single step, which can better ensure the reliability of the task state.
diff --git a/docs/docs/zh/guide/task/emr.md b/docs/docs/zh/guide/task/emr.md
index dfa17f6658..59dab0046e 100644
--- a/docs/docs/zh/guide/task/emr.md
+++ b/docs/docs/zh/guide/task/emr.md
@@ -2,7 +2,11 @@
 
 ## 综述
 
-Amazon EMR任务类型,用于在AWS上创建EMR集群并执行计算任务。 
后台使用[aws-java-sdk](https://aws.amazon.com/cn/sdk-for-java/) 
将json参数转换为[RunJobFlowRequest](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/elasticmapreduce/model/RunJobFlowRequest.html)
 对象,提交到AWS
+Amazon EMR 任务类型,用于在AWS上操作EMR集群并执行计算任务。 
+后台使用 [aws-java-sdk](https://aws.amazon.com/cn/sdk-for-java/) 
将JSON参数转换为任务对象,提交到AWS,目前支持两种程序类型:
+
+* `RUN_JOB_FLOW` 使用 
[API_RunJobFlow](https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html#API_RunJobFlow_Examples)
 提交 
[RunJobFlowRequest](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/elasticmapreduce/model/RunJobFlowRequest.html)
 对象
+* `ADD_JOB_FLOW_STEPS` 使用 
[API_AddJobFlowSteps](https://docs.aws.amazon.com/emr/latest/APIReference/API_AddJobFlowSteps.html#API_AddJobFlowSteps_Examples)
 提交 
[AddJobFlowStepsRequest](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/elasticmapreduce/model/AddJobFlowStepsRequest.html)
 对象
 
 ## 任务参数
 - 节点名称:一个工作流定义中的节点名称是唯一的。
@@ -13,9 +17,16 @@ Amazon EMR任务类型,用于在AWS上创建EMR集群并执行计算任务。
 - 失败重试次数:任务失败重新提交的次数,支持下拉和手填。
 - 失败重试间隔:任务失败重新提交任务的时间间隔,支持下拉和手填。
 - 超时告警:勾选超时告警、超时失败,当任务超过"超时时长"后,会发送告警邮件并且任务执行失败.
-- json: 
[RunJobFlowRequest](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/elasticmapreduce/model/RunJobFlowRequest.html)
 对象对应的json,详细json定义参见 
[API_RunJobFlow_Examples](https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html#API_RunJobFlow_Examples)
+- 
程序类型:选择程序类型,如果是`RUN_JOB_FLOW`,则需要填写`jobFlowDefineJson`,如果是`ADD_JOB_FLOW_STEPS`,则需要填写`stepsDefineJson`。
+  - jobFlowDefineJson: 
[RunJobFlowRequest](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/elasticmapreduce/model/RunJobFlowRequest.html)
 对象对应的JSON,详细JSON定义参见 
[API_RunJobFlow_Examples](https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html#API_RunJobFlow_Examples)
+  - 
stepsDefineJson:[AddJobFlowStepsRequest](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/elasticmapreduce/model/AddJobFlowStepsRequest.html)
 对象对应的JSON,详细JSON定义参见 
[API_AddJobFlowSteps_Examples](https://docs.aws.amazon.com/emr/latest/APIReference/API_AddJobFlowSteps.html#API_AddJobFlowSteps_Examples)
+
+## 任务样例
+### 创建EMR集群并运行Steps
+该样例展示了如何创建`RUN_JOB_FLOW`类型`EMR`任务节点,以执行`SparkPi`为例,该任务会创建一个`EMR`集群,并且执行`SparkPi`示例程序。
+![RUN_JOB_FLOW](../../../../img/tasks/demo/emr_run_job_flow.png)
 
-## json参数样例
+jobFlowDefineJson 参数样例
 ```json
 {
   "Name": "SparkPi",
@@ -56,3 +67,33 @@ Amazon EMR任务类型,用于在AWS上创建EMR集群并执行计算任务。
 }
 ```
 
+### 向运行中的EMR集群添加Step
+该样例展示了如何创建`ADD_JOB_FLOW_STEPS`类型`EMR`任务节点,以执行`SparkPi`为例,该任务会向运行中的`EMR`集群添加一个`SparkPi`示例程序。
+![ADD_JOB_FLOW_STEPS](../../../../img/tasks/demo/emr_add_job_flow_steps.png)
+![JobFlowId](../../../../img/tasks/demo/emr_jobFlowId.png)
+
+stepsDefineJson 参数样例
+```json
+{
+  "JobFlowId": "j-3V628TKAERHP8",
+  "Steps": [
+    {
+      "Name": "calculate_pi",
+      "ActionOnFailure": "CONTINUE",
+      "HadoopJarStep": {
+        "Jar": "command-runner.jar",
+        "Args": [
+          "/usr/lib/spark/bin/run-example",
+          "SparkPi",
+          "15"
+        ]
+      }
+    }
+  ]
+}
+```
+
+## 注意事项:
+
+- EMR 任务类型的故障转移尚未实现。目前,DolphinScheduler 仅支持对 yarn task type 进行故障转移。其他任务类型,如 
EMR 任务、k8s 任务尚未准备好。 
+- `stepsDefineJson` 一个任务定义仅支持关联单个step,这样可以更好的保证任务状态的可靠性。
\ No newline at end of file
diff --git a/docs/img/tasks/demo/emr_add_job_flow_steps.png 
b/docs/img/tasks/demo/emr_add_job_flow_steps.png
new file mode 100644
index 0000000000..bc8de30f72
Binary files /dev/null and b/docs/img/tasks/demo/emr_add_job_flow_steps.png 
differ
diff --git a/docs/img/tasks/demo/emr_jobFlowId.png 
b/docs/img/tasks/demo/emr_jobFlowId.png
new file mode 100644
index 0000000000..b79097afd5
Binary files /dev/null and b/docs/img/tasks/demo/emr_jobFlowId.png differ
diff --git a/docs/img/tasks/demo/emr_run_job_flow.png 
b/docs/img/tasks/demo/emr_run_job_flow.png
new file mode 100644
index 0000000000..ad09c5f7c9
Binary files /dev/null and b/docs/img/tasks/demo/emr_run_job_flow.png differ
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/AbstractEmrTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/AbstractEmrTask.java
new file mode 100644
index 0000000000..329c3bc3d5
--- /dev/null
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/AbstractEmrTask.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.emr;
+
+import static 
com.fasterxml.jackson.databind.DeserializationFeature.ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT;
+import static 
com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES;
+import static 
com.fasterxml.jackson.databind.DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL;
+import static 
com.fasterxml.jackson.databind.MapperFeature.REQUIRE_SETTERS_FOR_GETTERS;
+
+import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import 
org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+import org.apache.dolphinscheduler.spi.utils.PropertyUtils;
+
+import java.util.TimeZone;
+
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce;
+import 
com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClientBuilder;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.PropertyNamingStrategy;
+
+/**
+ * ERM Task abstract base class
+ *
+ * @since v3.1.0
+ */
+public abstract class AbstractEmrTask extends AbstractTaskExecutor {
+
+    final TaskExecutionContext taskExecutionContext;
+    EmrParameters emrParameters;
+    AmazonElasticMapReduce emrClient;
+    String clusterId;
+
+
+    /**
+     * config ObjectMapper features and propertyNamingStrategy
+     * use UpperCamelCaseStrategy support capital letters parse
+     *
+     * @see PropertyNamingStrategy.UpperCamelCaseStrategy
+     */
+    static final ObjectMapper objectMapper = new ObjectMapper()
+        .configure(FAIL_ON_UNKNOWN_PROPERTIES, false)
+        .configure(ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT, true)
+        .configure(READ_UNKNOWN_ENUM_VALUES_AS_NULL, true)
+        .configure(REQUIRE_SETTERS_FOR_GETTERS, true)
+        .setTimeZone(TimeZone.getDefault())
+        .setPropertyNamingStrategy(new 
PropertyNamingStrategy.UpperCamelCaseStrategy());
+
+    /**
+     * constructor
+     *
+     * @param taskExecutionContext taskExecutionContext
+     */
+    protected AbstractEmrTask(TaskExecutionContext taskExecutionContext) {
+        super(taskExecutionContext);
+        this.taskExecutionContext = taskExecutionContext;
+    }
+
+    @Override
+    public void init() {
+        final String taskParams = taskExecutionContext.getTaskParams();
+        logger.info("emr task params:{}", taskParams);
+        emrParameters = JSONUtils.parseObject(taskParams, EmrParameters.class);
+        if (emrParameters == null || !emrParameters.checkParameters()) {
+            throw new EmrTaskException("emr task params is not valid");
+        }
+        emrClient = createEmrClient();
+    }
+
+    @Override
+    public AbstractParameters getParameters() {
+        return emrParameters;
+    }
+
+    /**
+     * create emr client from BasicAWSCredentials
+     *
+     * @return AmazonElasticMapReduce
+     */
+    private AmazonElasticMapReduce createEmrClient() {
+
+        final String awsAccessKeyId = 
PropertyUtils.getString(TaskConstants.AWS_ACCESS_KEY_ID);
+        final String awsSecretAccessKey = 
PropertyUtils.getString(TaskConstants.AWS_SECRET_ACCESS_KEY);
+        final String awsRegion = 
PropertyUtils.getString(TaskConstants.AWS_REGION);
+        final BasicAWSCredentials basicAWSCredentials = new 
BasicAWSCredentials(awsAccessKeyId, awsSecretAccessKey);
+        final AWSCredentialsProvider awsCredentialsProvider = new 
AWSStaticCredentialsProvider(basicAWSCredentials);
+        // create an EMR client
+        return AmazonElasticMapReduceClientBuilder.standard()
+            .withCredentials(awsCredentialsProvider)
+            .withRegion(awsRegion)
+            .build();
+    }
+}
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTask.java
new file mode 100644
index 0000000000..d747577b71
--- /dev/null
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTask.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.emr;
+
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+
+import java.util.HashSet;
+import java.util.concurrent.TimeUnit;
+
+import com.amazonaws.SdkBaseException;
+import com.amazonaws.services.elasticmapreduce.model.AddJobFlowStepsRequest;
+import com.amazonaws.services.elasticmapreduce.model.AddJobFlowStepsResult;
+import com.amazonaws.services.elasticmapreduce.model.CancelStepsInfo;
+import com.amazonaws.services.elasticmapreduce.model.CancelStepsRequest;
+import com.amazonaws.services.elasticmapreduce.model.CancelStepsRequestStatus;
+import com.amazonaws.services.elasticmapreduce.model.CancelStepsResult;
+import com.amazonaws.services.elasticmapreduce.model.DescribeStepRequest;
+import com.amazonaws.services.elasticmapreduce.model.DescribeStepResult;
+import com.amazonaws.services.elasticmapreduce.model.StepState;
+import com.amazonaws.services.elasticmapreduce.model.StepStatus;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.collect.Sets;
+
+/**
+ * AddJobFlowSteps task executor
+ *
+ * @since v3.1.0
+ */
+public class EmrAddStepsTask extends AbstractEmrTask {
+
+    private String stepId;
+
+    private final HashSet<String> waitingStateSet = Sets.newHashSet(
+        StepState.PENDING.toString(),
+        StepState.CANCEL_PENDING.toString(),
+        StepState.RUNNING.toString()
+    );
+
+    /**
+     * constructor
+     *
+     * @param taskExecutionContext taskExecutionContext
+     */
+    protected EmrAddStepsTask(TaskExecutionContext taskExecutionContext) {
+        super(taskExecutionContext);
+    }
+
+    @Override
+    public void handle() throws InterruptedException {
+        StepStatus stepStatus = null;
+        try {
+            AddJobFlowStepsRequest addJobFlowStepsRequest = 
createAddJobFlowStepsRequest();
+
+            // submit addJobFlowStepsRequest to aws
+            AddJobFlowStepsResult result = 
emrClient.addJobFlowSteps(addJobFlowStepsRequest);
+
+            clusterId = addJobFlowStepsRequest.getJobFlowId();
+            stepId = result.getStepIds().get(0);
+            // use clusterId-stepId as appIds
+            setAppIds(clusterId + TaskConstants.SUBTRACT_STRING + stepId);
+
+            stepStatus = getStepStatus();
+
+            while (waitingStateSet.contains(stepStatus.getState())) {
+                TimeUnit.SECONDS.sleep(10);
+                stepStatus = getStepStatus();
+            }
+
+        } catch (EmrTaskException | SdkBaseException e) {
+            logger.error("emr task submit failed with error", e);
+        } finally {
+            final int exitStatusCode = calculateExitStatusCode(stepStatus);
+            setExitStatusCode(exitStatusCode);
+            logger.info("emr task finished with step status : {}", stepStatus);
+        }
+    }
+
+    /**
+     * parse json string to AddJobFlowStepsRequest
+     *
+     * @return AddJobFlowStepsRequest
+     */
+    private AddJobFlowStepsRequest createAddJobFlowStepsRequest() {
+
+        final AddJobFlowStepsRequest addJobFlowStepsRequest;
+        try {
+            addJobFlowStepsRequest = 
objectMapper.readValue(emrParameters.getStepsDefineJson(), 
AddJobFlowStepsRequest.class);
+        } catch (JsonProcessingException e) {
+            throw new EmrTaskException("can not parse AddJobFlowStepsRequest 
from json", e);
+        }
+
+        // When a single task definition is associated with multiple steps, 
the state tracking will have high complexity.
+        // Therefore, A task definition only supports the association of a 
single step, which can better ensure the reliability of the task state.
+        if (addJobFlowStepsRequest.getSteps().size() > 1) {
+            throw new EmrTaskException("ds emr addJobFlowStepsTask only 
support one step");
+        }
+
+        return addJobFlowStepsRequest;
+    }
+
+    /**
+     * calculate task exitStatusCode
+     *
+     * @param stepStatus aws emr execution status details of the cluster step.
+     * @return exitStatusCode
+     */
+    private int calculateExitStatusCode(StepStatus stepStatus) {
+        if (stepStatus == null) {
+            return TaskConstants.EXIT_CODE_FAILURE;
+        } else {
+            String state = stepStatus.getState();
+            StepState stepState = StepState.valueOf(state);
+            switch (stepState) {
+                case COMPLETED:
+                    return TaskConstants.EXIT_CODE_SUCCESS;
+                case CANCELLED:
+                    return TaskConstants.EXIT_CODE_KILL;
+                default:
+                    return TaskConstants.EXIT_CODE_FAILURE;
+            }
+        }
+
+    }
+
+    private StepStatus getStepStatus() {
+        DescribeStepRequest describeStepRequest = new 
DescribeStepRequest().withClusterId(clusterId).withStepId(stepId);
+        DescribeStepResult result = 
emrClient.describeStep(describeStepRequest);
+        if (result == null) {
+            throw new EmrTaskException("fetch step status failed");
+        }
+        StepStatus stepStatus = result.getStep().getStatus();
+        logger.info("emr step [clusterId:{}, stepId:{}] running with 
status:{}", clusterId, stepId, stepStatus);
+        return stepStatus;
+
+    }
+
+    @Override
+    public void cancelApplication(boolean status) throws Exception {
+        super.cancelApplication(status);
+        logger.info("trying cancel emr step, taskId:{}, clusterId:{}, 
stepId:{}", this.taskExecutionContext.getTaskInstanceId(), clusterId, stepId);
+        CancelStepsRequest cancelStepsRequest = new 
CancelStepsRequest().withClusterId(clusterId).withStepIds(stepId);
+        CancelStepsResult cancelStepsResult = 
emrClient.cancelSteps(cancelStepsRequest);
+
+        if (cancelStepsResult == null) {
+            throw new EmrTaskException("cancel emr step failed");
+        }
+
+        CancelStepsInfo cancelEmrStepInfo = 
cancelStepsResult.getCancelStepsInfoList()
+            .stream()
+            .filter(cancelStepsInfo -> 
cancelStepsInfo.getStepId().equals(stepId))
+            .findFirst()
+            .orElseThrow(() -> new EmrTaskException("cancel emr step failed"));
+
+        if 
(CancelStepsRequestStatus.FAILED.toString().equals(cancelEmrStepInfo.getStatus()))
 {
+            throw new EmrTaskException("cancel emr step failed, message:" + 
cancelEmrStepInfo.getReason());
+        }
+
+        logger.info("the result of cancel emr step is:{}", cancelStepsResult);
+    }
+
+}
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTask.java
similarity index 61%
rename from 
dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrTask.java
rename to 
dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTask.java
index ada0041bf9..ed42de2831 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTask.java
@@ -17,28 +17,13 @@
 
 package org.apache.dolphinscheduler.plugin.task.emr;
 
-import static 
com.fasterxml.jackson.databind.DeserializationFeature.ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT;
-import static 
com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES;
-import static 
com.fasterxml.jackson.databind.DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL;
-import static 
com.fasterxml.jackson.databind.MapperFeature.REQUIRE_SETTERS_FOR_GETTERS;
-
-import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
 import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import 
org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
-import org.apache.dolphinscheduler.spi.utils.JSONUtils;
-import org.apache.dolphinscheduler.spi.utils.PropertyUtils;
 
 import java.util.HashSet;
-import java.util.TimeZone;
 import java.util.concurrent.TimeUnit;
 
 import com.amazonaws.SdkBaseException;
-import com.amazonaws.auth.AWSCredentialsProvider;
-import com.amazonaws.auth.AWSStaticCredentialsProvider;
-import com.amazonaws.auth.BasicAWSCredentials;
-import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce;
-import 
com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClientBuilder;
 import com.amazonaws.services.elasticmapreduce.model.ClusterState;
 import com.amazonaws.services.elasticmapreduce.model.ClusterStateChangeReason;
 import 
com.amazonaws.services.elasticmapreduce.model.ClusterStateChangeReasonCode;
@@ -50,23 +35,9 @@ import 
com.amazonaws.services.elasticmapreduce.model.RunJobFlowResult;
 import com.amazonaws.services.elasticmapreduce.model.TerminateJobFlowsRequest;
 import com.amazonaws.services.elasticmapreduce.model.TerminateJobFlowsResult;
 import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.PropertyNamingStrategy;
 import com.google.common.collect.Sets;
 
-public class EmrTask extends AbstractTaskExecutor {
-
-    /**
-     * taskExecutionContext
-     */
-    private final TaskExecutionContext taskExecutionContext;
-    /**
-     * emr parameters
-     */
-    private EmrParameters emrParameters;
-    private AmazonElasticMapReduce emrClient;
-
-    private String clusterId;
+public class EmrJobFlowTask extends AbstractEmrTask {
 
     private final HashSet<String> waitingStateSet = Sets.newHashSet(
         ClusterState.STARTING.toString(),
@@ -74,40 +45,13 @@ public class EmrTask extends AbstractTaskExecutor {
         ClusterState.RUNNING.toString()
     );
 
-    /**
-     * config ObjectMapper features and propertyNamingStrategy
-     * use UpperCamelCaseStrategy support capital letters parse
-     * @see PropertyNamingStrategy.UpperCamelCaseStrategy
-     */
-    private static final ObjectMapper objectMapper = new ObjectMapper()
-        .configure(FAIL_ON_UNKNOWN_PROPERTIES, false)
-        .configure(ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT, true)
-        .configure(READ_UNKNOWN_ENUM_VALUES_AS_NULL, true)
-        .configure(REQUIRE_SETTERS_FOR_GETTERS, true)
-        .setTimeZone(TimeZone.getDefault())
-        .setPropertyNamingStrategy(new 
PropertyNamingStrategy.UpperCamelCaseStrategy());
-
     /**
      * constructor
      *
      * @param taskExecutionContext taskExecutionContext
      */
-    protected EmrTask(TaskExecutionContext taskExecutionContext) {
-
+    protected EmrJobFlowTask(TaskExecutionContext taskExecutionContext) {
         super(taskExecutionContext);
-        this.taskExecutionContext = taskExecutionContext;
-    }
-
-    @Override
-    public void init() {
-
-        final String taskParams = taskExecutionContext.getTaskParams();
-        logger.info("emr task params:{}", taskParams);
-        emrParameters = JSONUtils.parseObject(taskParams, EmrParameters.class);
-        if (emrParameters == null || !emrParameters.checkParameters()) {
-            throw new EmrTaskException("emr task params is not valid");
-        }
-        emrClient = createEmrClient();
     }
 
     @Override
@@ -120,7 +64,7 @@ public class EmrTask extends AbstractTaskExecutor {
             RunJobFlowResult result = emrClient.runJobFlow(runJobFlowRequest);
 
             clusterId = result.getJobFlowId();
-            // TODO: Failover on EMR Task type has not been implemented. In 
this time, DS only supports failover on yarn task type . Other task type, such 
as EMR task, k8s task not ready yet.
+            // Failover on EMR Task type has not been implemented. In this 
time, DS only supports failover on yarn task type . Other task type, such as 
EMR task, k8s task not ready yet.
             setAppIds(clusterId);
 
             clusterStatus = getClusterStatus();
@@ -199,30 +143,6 @@ public class EmrTask extends AbstractTaskExecutor {
 
     }
 
-    @Override
-    public AbstractParameters getParameters() {
-        return emrParameters;
-    }
-
-    /**
-     * create emr client from BasicAWSCredentials
-     *
-     * @return AmazonElasticMapReduce
-     */
-    private AmazonElasticMapReduce createEmrClient() {
-
-        final String awsAccessKeyId = 
PropertyUtils.getString(TaskConstants.AWS_ACCESS_KEY_ID);
-        final String awsSecretAccessKey = 
PropertyUtils.getString(TaskConstants.AWS_SECRET_ACCESS_KEY);
-        final String awsRegion = 
PropertyUtils.getString(TaskConstants.AWS_REGION);
-        final BasicAWSCredentials basicAWSCredentials = new 
BasicAWSCredentials(awsAccessKeyId, awsSecretAccessKey);
-        final AWSCredentialsProvider awsCredentialsProvider = new 
AWSStaticCredentialsProvider(basicAWSCredentials);
-        // create an EMR client
-        return AmazonElasticMapReduceClientBuilder.standard()
-            .withCredentials(awsCredentialsProvider)
-            .withRegion(awsRegion)
-            .build();
-    }
-
     @Override
     public void cancelApplication(boolean status) throws Exception {
         super.cancelApplication(status);
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrParameters.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrParameters.java
index 14396fcebc..1e38a66d81 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrParameters.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrParameters.java
@@ -24,18 +24,41 @@ import org.apache.dolphinscheduler.spi.utils.StringUtils;
 import java.util.Collections;
 import java.util.List;
 
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+@Data
+@EqualsAndHashCode(callSuper = true)
 public class EmrParameters extends AbstractParameters {
 
+    /**
+     * emr program type
+     * 0 RUN_JOB_FLOW, 1 ADD_JOB_FLOW_STEPS
+     */
+    private ProgramType programType;
+
     /**
      * job flow define in json format
+     *
      * @see <a 
href="https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html#API_RunJobFlow_Examples";>API_RunJobFlow_Examples</a>
      */
     private String jobFlowDefineJson;
 
+    /**
+     * steps define in json format
+     *
+     * @see <a 
href="https://docs.aws.amazon.com/emr/latest/APIReference/API_AddJobFlowSteps.html#API_AddJobFlowSteps_Examples";>API_AddJobFlowSteps_Examples</a>
+     */
+    private String stepsDefineJson;
+
     @Override
     public boolean checkParameters() {
-
-        return StringUtils.isNotEmpty(jobFlowDefineJson);
+        /*
+         * When saving a task, the programType cannot be empty and 
jobFlowDefineJson or stepsDefineJson cannot be empty:
+         * (1) When ProgramType is RUN_JOB_FLOW, jobFlowDefineJson cannot be 
empty.
+         * (2) When ProgramType is ADD_JOB_FLOW_STEPS, stepsDefineJson cannot 
be empty.
+         */
+        return programType != null && 
(StringUtils.isNotEmpty(jobFlowDefineJson) || 
StringUtils.isNotEmpty(stepsDefineJson));
     }
 
     @Override
@@ -44,18 +67,12 @@ public class EmrParameters extends AbstractParameters {
 
     }
 
-    public String getJobFlowDefineJson() {
-        return jobFlowDefineJson;
-    }
-
-    public void setJobFlowDefineJson(String jobFlowDefineJson) {
-        this.jobFlowDefineJson = jobFlowDefineJson;
-    }
-
     @Override
     public String toString() {
         return "EmrParameters{"
-            + "jobFlowDefineJson='" + jobFlowDefineJson + '\''
+            + "programType=" + programType
+            + ", jobFlowDefineJson='" + jobFlowDefineJson + '\''
+            + ", stepsDefineJson='" + stepsDefineJson + '\''
             + '}';
     }
 }
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrTaskChannel.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrTaskChannel.java
index 898362f2cc..e59b4e3613 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrTaskChannel.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrTaskChannel.java
@@ -33,7 +33,15 @@ public class EmrTaskChannel implements TaskChannel {
 
     @Override
     public AbstractTask createTask(TaskExecutionContext taskRequest) {
-        return new EmrTask(taskRequest);
+        EmrParameters emrParameters = 
JSONUtils.parseObject(taskRequest.getTaskParams(), EmrParameters.class);
+        assert emrParameters != null;
+        if (ProgramType.RUN_JOB_FLOW.equals(emrParameters.getProgramType())) {
+            return new EmrJobFlowTask(taskRequest);
+        } else if 
(ProgramType.ADD_JOB_FLOW_STEPS.equals(emrParameters.getProgramType())) {
+            return new EmrAddStepsTask(taskRequest);
+        } else {
+            throw new IllegalArgumentException("Unsupported program type: " + 
emrParameters.getProgramType());
+        }
     }
 
     @Override
diff --git 
a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-emr.ts 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/ProgramType.java
similarity index 55%
copy from 
dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-emr.ts
copy to 
dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/ProgramType.java
index a67b370d18..0f0870c119 100644
--- 
a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-emr.ts
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/ProgramType.java
@@ -14,31 +14,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-import { useI18n } from 'vue-i18n'
-import { useCustomParams } from '.'
-import type { IJsonItem } from '../types'
 
-export function useEmr(model: { [field: string]: any }): IJsonItem[] {
-  const { t } = useI18n()
+package org.apache.dolphinscheduler.plugin.task.emr;
 
-  return [
-    {
-      type: 'editor',
-      field: 'jobFlowDefineJson',
-      name: t('project.node.emr_flow_define_json'),
-      props: {
-        language: 'json'
-      },
-      validate: {
-        trigger: ['input', 'trigger'],
-        required: true,
-        message: t('project.node.emr_flow_define_json_tips')
-      }
-    },
-    ...useCustomParams({
-      model,
-      field: 'localParams',
-      isSimple: true
-    })
-  ]
+/**
+ * emr program type
+ *
+ * @since v3.1.0
+ */
+public enum ProgramType {
+    /**
+     * RunJobFlow
+     */
+    RUN_JOB_FLOW,
+    /**
+     * AddJobFlowSteps
+     */
+    ADD_JOB_FLOW_STEPS
 }
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTaskTest.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTaskTest.java
new file mode 100644
index 0000000000..d74d36fa82
--- /dev/null
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTaskTest.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.emr;
+
+import static 
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
+import static 
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_KILL;
+import static 
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_SUCCESS;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.powermock.api.mockito.PowerMockito.doReturn;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.mockStatic;
+import static org.powermock.api.mockito.PowerMockito.spy;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+
+import org.apache.commons.io.IOUtils;
+
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce;
+import 
com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClientBuilder;
+import com.amazonaws.services.elasticmapreduce.model.AddJobFlowStepsResult;
+import 
com.amazonaws.services.elasticmapreduce.model.AmazonElasticMapReduceException;
+import com.amazonaws.services.elasticmapreduce.model.DescribeStepResult;
+import com.amazonaws.services.elasticmapreduce.model.Step;
+import com.amazonaws.services.elasticmapreduce.model.StepState;
+import com.amazonaws.services.elasticmapreduce.model.StepStatus;
+
+/**
+ * EmrAddStepsTask Test
+ *
+ * @since v3.1.0
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({
+    AmazonElasticMapReduceClientBuilder.class,
+    EmrAddStepsTask.class,
+    AmazonElasticMapReduce.class,
+    JSONUtils.class
+})
+@PowerMockIgnore({"javax.*"})
+public class EmrAddStepsTaskTest {
+
+    private final StepStatus pendingState =
+        new StepStatus().withState(StepState.PENDING);
+
+    private final StepStatus runningState =
+        new StepStatus().withState(StepState.RUNNING);
+
+    private final StepStatus completedState =
+        new StepStatus().withState(StepState.COMPLETED);
+
+    private final StepStatus cancelledState =
+        new StepStatus().withState(StepState.CANCELLED);
+
+    private final StepStatus failedState =
+        new StepStatus().withState(StepState.FAILED);
+
+    private EmrAddStepsTask emrAddStepsTask;
+    private AmazonElasticMapReduce emrClient;
+    private Step step;
+
+    @Before
+    public void before() throws Exception {
+        // mock EmrParameters and EmrAddStepsTask
+        EmrParameters emrParameters = buildEmrTaskParameters();
+        String emrParametersString = JSONUtils.toJsonString(emrParameters);
+        TaskExecutionContext taskExecutionContext = 
PowerMockito.mock(TaskExecutionContext.class);
+        
when(taskExecutionContext.getTaskParams()).thenReturn(emrParametersString);
+        emrAddStepsTask = spy(new EmrAddStepsTask(taskExecutionContext));
+
+        // mock emrClient and behavior
+        emrClient = mock(AmazonElasticMapReduce.class);
+
+        AddJobFlowStepsResult addJobFlowStepsResult = 
mock(AddJobFlowStepsResult.class);
+        
when(emrClient.addJobFlowSteps(any())).thenReturn(addJobFlowStepsResult);
+        
when(addJobFlowStepsResult.getStepIds()).thenReturn(Collections.singletonList("step-xx"));
+
+        doReturn(emrClient).when(emrAddStepsTask, "createEmrClient");
+        DescribeStepResult describeStepResult = mock(DescribeStepResult.class);
+        when(emrClient.describeStep(any())).thenReturn(describeStepResult);
+
+        // mock step
+        step = mock(Step.class);
+        when(describeStepResult.getStep()).thenReturn(step);
+
+        emrAddStepsTask.init();
+    }
+
+    @Test
+    public void testCanNotParseJson() throws Exception {
+        mockStatic(JSONUtils.class);
+        when(emrAddStepsTask, "createAddJobFlowStepsRequest").thenThrow(new 
EmrTaskException("can not parse AddJobFlowStepsRequest from json", new 
Exception("error")));
+        emrAddStepsTask.handle();
+        Assert.assertEquals(EXIT_CODE_FAILURE, 
emrAddStepsTask.getExitStatusCode());
+    }
+
+    @Test
+    public void testDefineJsonStepNotOne() throws Exception {
+        // mock EmrParameters and EmrAddStepsTask
+        EmrParameters emrParameters = buildErrorEmrTaskParameters();
+        String emrParametersString = JSONUtils.toJsonString(emrParameters);
+        TaskExecutionContext taskExecutionContext = 
PowerMockito.mock(TaskExecutionContext.class);
+        
when(taskExecutionContext.getTaskParams()).thenReturn(emrParametersString);
+        emrAddStepsTask = spy(new EmrAddStepsTask(taskExecutionContext));
+        doReturn(emrClient).when(emrAddStepsTask, "createEmrClient");
+        emrAddStepsTask.init();
+        emrAddStepsTask.handle();
+
+        Assert.assertEquals(EXIT_CODE_FAILURE, 
emrAddStepsTask.getExitStatusCode());
+    }
+
+    @Test
+    public void testHandle() throws Exception {
+        when(step.getStatus()).thenReturn(pendingState, runningState, 
completedState);
+
+        emrAddStepsTask.handle();
+        Assert.assertEquals(EXIT_CODE_SUCCESS, 
emrAddStepsTask.getExitStatusCode());
+    }
+
+    @Test
+    public void testHandleUserRequestTerminate() throws Exception {
+        when(step.getStatus()).thenReturn(pendingState, runningState, 
cancelledState);
+
+        emrAddStepsTask.handle();
+        Assert.assertEquals(EXIT_CODE_KILL, 
emrAddStepsTask.getExitStatusCode());
+    }
+
+    @Test
+    public void testHandleError() throws Exception {
+        when(step.getStatus()).thenReturn(pendingState, runningState, 
failedState);
+        emrAddStepsTask.handle();
+        Assert.assertEquals(EXIT_CODE_FAILURE, 
emrAddStepsTask.getExitStatusCode());
+
+        when(emrClient.addJobFlowSteps(any())).thenThrow(new 
AmazonElasticMapReduceException("error"), new EmrTaskException());
+        emrAddStepsTask.handle();
+        Assert.assertEquals(EXIT_CODE_FAILURE, 
emrAddStepsTask.getExitStatusCode());
+    }
+
+    private EmrParameters buildEmrTaskParameters() {
+        EmrParameters emrParameters = new EmrParameters();
+        String stepsDefineJson;
+        try (InputStream i = 
this.getClass().getResourceAsStream("EmrAddStepsDefine.json")) {
+            assert i != null;
+            stepsDefineJson = IOUtils.toString(i, StandardCharsets.UTF_8);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+        emrParameters.setProgramType(ProgramType.ADD_JOB_FLOW_STEPS);
+        emrParameters.setStepsDefineJson(stepsDefineJson);
+
+        return emrParameters;
+    }
+
+    private EmrParameters buildErrorEmrTaskParameters() {
+        EmrParameters emrParameters = new EmrParameters();
+        String stepsDefineJson;
+        try (InputStream i = 
this.getClass().getResourceAsStream("EmrErrorAddStepsDefine.json")) {
+            assert i != null;
+            stepsDefineJson = IOUtils.toString(i, StandardCharsets.UTF_8);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+        emrParameters.setProgramType(ProgramType.ADD_JOB_FLOW_STEPS);
+        emrParameters.setStepsDefineJson(stepsDefineJson);
+
+        return emrParameters;
+    }
+}
\ No newline at end of file
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrTaskTest.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTaskTest.java
similarity index 84%
rename from 
dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrTaskTest.java
rename to 
dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTaskTest.java
index 285078f21f..65c6c0c239 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrTaskTest.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTaskTest.java
@@ -59,12 +59,12 @@ import 
com.amazonaws.services.elasticmapreduce.model.RunJobFlowResult;
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({
     AmazonElasticMapReduceClientBuilder.class,
-    EmrTask.class,
+    EmrJobFlowTask.class,
     AmazonElasticMapReduce.class,
     JSONUtils.class
 })
 @PowerMockIgnore({"javax.*"})
-public class EmrTaskTest {
+public class EmrJobFlowTaskTest {
 
     private final ClusterStatus startingStatus =
         new ClusterStatus().withState(ClusterState.STARTING)
@@ -114,7 +114,7 @@ public class EmrTaskTest {
                     .withCode(ClusterStateChangeReasonCode.STEP_FAILURE)
             );
 
-    private EmrTask emrTask;
+    private EmrJobFlowTask emrJobFlowTask;
     private AmazonElasticMapReduce emrClient;
     private Cluster cluster;
 
@@ -123,14 +123,14 @@ public class EmrTaskTest {
         String emrParameters = buildEmrTaskParameters();
         TaskExecutionContext taskExecutionContext = 
PowerMockito.mock(TaskExecutionContext.class);
         when(taskExecutionContext.getTaskParams()).thenReturn(emrParameters);
-        emrTask = spy(new EmrTask(taskExecutionContext));
+        emrJobFlowTask = spy(new EmrJobFlowTask(taskExecutionContext));
 
         // mock emrClient and behavior
         emrClient = mock(AmazonElasticMapReduce.class);
         RunJobFlowResult runJobFlowResult = mock(RunJobFlowResult.class);
         when(emrClient.runJobFlow(any())).thenReturn(runJobFlowResult);
         when(runJobFlowResult.getJobFlowId()).thenReturn("xx");
-        doReturn(emrClient).when(emrTask, "createEmrClient");
+        doReturn(emrClient).when(emrJobFlowTask, "createEmrClient");
         DescribeClusterResult describeClusterResult = 
mock(DescribeClusterResult.class);
         
when(emrClient.describeCluster(any())).thenReturn(describeClusterResult);
 
@@ -138,7 +138,7 @@ public class EmrTaskTest {
         cluster = mock(Cluster.class);
         when(describeClusterResult.getCluster()).thenReturn(cluster);
 
-        emrTask.init();
+        emrJobFlowTask.init();
     }
 
     @Test
@@ -146,8 +146,8 @@ public class EmrTaskTest {
 
         when(cluster.getStatus()).thenReturn(startingStatus, 
softwareConfigStatus, runningStatus, terminatingStatus);
 
-        emrTask.handle();
-        Assert.assertEquals(EXIT_CODE_SUCCESS, emrTask.getExitStatusCode());
+        emrJobFlowTask.handle();
+        Assert.assertEquals(EXIT_CODE_SUCCESS, 
emrJobFlowTask.getExitStatusCode());
 
     }
 
@@ -155,32 +155,32 @@ public class EmrTaskTest {
     public void testHandleAliveWhenNoSteps() throws Exception {
         when(cluster.getStatus()).thenReturn(startingStatus, 
softwareConfigStatus, runningStatus, waitingStatus);
 
-        emrTask.handle();
-        Assert.assertEquals(EXIT_CODE_SUCCESS, emrTask.getExitStatusCode());
+        emrJobFlowTask.handle();
+        Assert.assertEquals(EXIT_CODE_SUCCESS, 
emrJobFlowTask.getExitStatusCode());
     }
 
     @Test
     public void testHandleUserRequestTerminate() throws Exception {
         when(cluster.getStatus()).thenReturn(startingStatus, 
userRequestTerminateStatus);
 
-        emrTask.handle();
-        Assert.assertEquals(EXIT_CODE_KILL, emrTask.getExitStatusCode());
+        emrJobFlowTask.handle();
+        Assert.assertEquals(EXIT_CODE_KILL, 
emrJobFlowTask.getExitStatusCode());
     }
 
     @Test
     public void testHandleTerminatedWithError() throws Exception {
         when(cluster.getStatus()).thenReturn(startingStatus, 
softwareConfigStatus, runningStatus, terminatedWithErrorsStatus);
 
-        emrTask.handle();
-        Assert.assertEquals(EXIT_CODE_FAILURE, emrTask.getExitStatusCode());
+        emrJobFlowTask.handle();
+        Assert.assertEquals(EXIT_CODE_FAILURE, 
emrJobFlowTask.getExitStatusCode());
     }
 
     @Test
     public void testCanNotParseJson() throws Exception {
         mockStatic(JSONUtils.class);
-        when(emrTask, "createRunJobFlowRequest").thenThrow(new 
EmrTaskException("can not parse RunJobFlowRequest from json", new 
Exception("error")));
-        emrTask.handle();
-        Assert.assertEquals(EXIT_CODE_FAILURE, emrTask.getExitStatusCode());
+        when(emrJobFlowTask, "createRunJobFlowRequest").thenThrow(new 
EmrTaskException("can not parse RunJobFlowRequest from json", new 
Exception("error")));
+        emrJobFlowTask.handle();
+        Assert.assertEquals(EXIT_CODE_FAILURE, 
emrJobFlowTask.getExitStatusCode());
     }
 
     @Test
@@ -188,18 +188,18 @@ public class EmrTaskTest {
 
         when(emrClient.describeCluster(any())).thenReturn(null);
 
-        emrTask.handle();
-        Assert.assertEquals(EXIT_CODE_FAILURE, emrTask.getExitStatusCode());
+        emrJobFlowTask.handle();
+        Assert.assertEquals(EXIT_CODE_FAILURE, 
emrJobFlowTask.getExitStatusCode());
     }
 
     @Test
     public void testRunJobFlowError() throws Exception {
 
         when(emrClient.runJobFlow(any())).thenThrow(new 
AmazonElasticMapReduceException("error"), new EmrTaskException());
-        emrTask.handle();
-        Assert.assertEquals(EXIT_CODE_FAILURE, emrTask.getExitStatusCode());
-        emrTask.handle();
-        Assert.assertEquals(EXIT_CODE_FAILURE, emrTask.getExitStatusCode());
+        emrJobFlowTask.handle();
+        Assert.assertEquals(EXIT_CODE_FAILURE, 
emrJobFlowTask.getExitStatusCode());
+        emrJobFlowTask.handle();
+        Assert.assertEquals(EXIT_CODE_FAILURE, 
emrJobFlowTask.getExitStatusCode());
 
     }
 
@@ -212,6 +212,7 @@ public class EmrTaskTest {
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
+        emrParameters.setProgramType(ProgramType.RUN_JOB_FLOW);
         emrParameters.setJobFlowDefineJson(jobFlowDefineJson);
 
         return JSONUtils.toJsonString(emrParameters);
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/resources/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsDefine.json
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/resources/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsDefine.json
new file mode 100644
index 0000000000..a14a259a88
--- /dev/null
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/resources/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsDefine.json
@@ -0,0 +1,17 @@
+{
+  "JobFlowId": "j-3V628TKAERHP8",
+  "Steps": [
+    {
+      "Name": "calculate_pi",
+      "ActionOnFailure": "CONTINUE",
+      "HadoopJarStep": {
+        "Jar": "command-runner.jar",
+        "Args": [
+          "/usr/lib/spark/bin/run-example",
+          "SparkPi",
+          "15"
+        ]
+      }
+    }
+  ]
+}
\ No newline at end of file
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/resources/org/apache/dolphinscheduler/plugin/task/emr/EmrErrorAddStepsDefine.json
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/resources/org/apache/dolphinscheduler/plugin/task/emr/EmrErrorAddStepsDefine.json
new file mode 100644
index 0000000000..46e81b13ba
--- /dev/null
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/resources/org/apache/dolphinscheduler/plugin/task/emr/EmrErrorAddStepsDefine.json
@@ -0,0 +1,29 @@
+{
+  "JobFlowId": "j-3V628TKAERHP8",
+  "Steps": [
+    {
+      "Name": "calculate_pi",
+      "ActionOnFailure": "CONTINUE",
+      "HadoopJarStep": {
+        "Jar": "command-runner.jar",
+        "Args": [
+          "/usr/lib/spark/bin/run-example",
+          "SparkPi",
+          "15"
+        ]
+      }
+    },
+    {
+      "Name": "calculate_pi",
+      "ActionOnFailure": "CONTINUE",
+      "HadoopJarStep": {
+        "Jar": "command-runner.jar",
+        "Args": [
+          "/usr/lib/spark/bin/run-example",
+          "SparkPi",
+          "15"
+        ]
+      }
+    }
+  ]
+}
\ No newline at end of file
diff --git a/dolphinscheduler-ui/src/locales/en_US/project.ts 
b/dolphinscheduler-ui/src/locales/en_US/project.ts
index cf2cef1271..43cba0d3d6 100644
--- a/dolphinscheduler-ui/src/locales/en_US/project.ts
+++ b/dolphinscheduler-ui/src/locales/en_US/project.ts
@@ -607,6 +607,8 @@ export default {
     required: 'required',
     emr_flow_define_json: 'jobFlowDefineJson',
     emr_flow_define_json_tips: 'Please enter the definition of the job flow.',
+    emr_steps_define_json: 'stepsDefineJson',
+    emr_steps_define_json_tips: 'Please enter the definition of the emr step.',
     segment_separator: 'Segment Execution Separator',
     segment_separator_tips: 'Please enter the segment execution separator',
     zeppelin_note_id: 'zeppelinNoteId',
diff --git a/dolphinscheduler-ui/src/locales/zh_CN/project.ts 
b/dolphinscheduler-ui/src/locales/zh_CN/project.ts
index ab2c9c71f4..3706771854 100644
--- a/dolphinscheduler-ui/src/locales/zh_CN/project.ts
+++ b/dolphinscheduler-ui/src/locales/zh_CN/project.ts
@@ -600,6 +600,8 @@ export default {
     required: '必填',
     emr_flow_define_json: 'jobFlowDefineJson',
     emr_flow_define_json_tips: '请输入工作流定义',
+    emr_steps_define_json: 'stepsDefineJson',
+    emr_steps_define_json_tips: '请输入EMR步骤定义',
     segment_separator: '分段执行符号',
     segment_separator_tips: '请输入分段执行符号',
     zeppelin_note_id: 'zeppelin_note_id',
diff --git 
a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-emr.ts 
b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-emr.ts
index a67b370d18..1446fc866a 100644
--- 
a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-emr.ts
+++ 
b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-emr.ts
@@ -17,14 +17,30 @@
 import { useI18n } from 'vue-i18n'
 import { useCustomParams } from '.'
 import type { IJsonItem } from '../types'
+import {computed} from "vue";
 
 export function useEmr(model: { [field: string]: any }): IJsonItem[] {
   const { t } = useI18n()
 
+  const jobFlowDefineJsonSpan = computed(() => (model.programType === 
'RUN_JOB_FLOW' ? 24 : 0))
+
+  const stepsDefineJsonSpan = computed(() => (model.programType === 
'ADD_JOB_FLOW_STEPS' ? 24 : 0))
+
   return [
+    {
+      type: 'select',
+      field: 'programType',
+      span: 24,
+      name: t('project.node.program_type'),
+      options: PROGRAM_TYPES,
+      validate: {
+        required: true
+      }
+    },
     {
       type: 'editor',
       field: 'jobFlowDefineJson',
+      span: jobFlowDefineJsonSpan,
       name: t('project.node.emr_flow_define_json'),
       props: {
         language: 'json'
@@ -35,6 +51,20 @@ export function useEmr(model: { [field: string]: any }): 
IJsonItem[] {
         message: t('project.node.emr_flow_define_json_tips')
       }
     },
+    {
+      type: 'editor',
+      field: 'stepsDefineJson',
+      span: stepsDefineJsonSpan,
+      name: t('project.node.emr_steps_define_json'),
+      props: {
+        language: 'json'
+      },
+      validate: {
+        trigger: ['input', 'trigger'],
+        required: true,
+        message: t('project.node.emr_steps_define_json_tips')
+      }
+    },
     ...useCustomParams({
       model,
       field: 'localParams',
@@ -42,3 +72,14 @@ export function useEmr(model: { [field: string]: any }): 
IJsonItem[] {
     })
   ]
 }
+
+export const PROGRAM_TYPES = [
+  {
+    label: 'RUN_JOB_FLOW',
+    value: 'RUN_JOB_FLOW'
+  },
+  {
+    label: 'ADD_JOB_FLOW_STEPS',
+    value: 'ADD_JOB_FLOW_STEPS'
+  }
+]
\ No newline at end of file
diff --git 
a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts 
b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
index c90d1660db..4ab18ccd05 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
@@ -321,7 +321,9 @@ export function formatParams(data: INodeData): {
 
   if (data.taskType === 'EMR') {
     taskParams.type = data.type
+    taskParams.programType = data.programType
     taskParams.jobFlowDefineJson = data.jobFlowDefineJson
+    taskParams.stepsDefineJson = data.stepsDefineJson
   }
 
   if (data.taskType === 'ZEPPELIN') {
diff --git 
a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-emr.ts 
b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-emr.ts
index 1face687fb..36dbbefa79 100644
--- 
a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-emr.ts
+++ 
b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-emr.ts
@@ -44,6 +44,7 @@ export function useEmr({
     workerGroup: 'default',
     delayTime: 0,
     timeout: 30,
+    programType: 'ADD_JOB_FLOW_STEPS',
     timeoutNotifyStrategy: ['WARN']
   } as INodeData)
 
diff --git 
a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts 
b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
index 5019f29ce1..187ff7532f 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
@@ -295,6 +295,7 @@ interface ITaskParams {
   ruleId?: number
   ruleInputParameter?: IRuleParameters
   jobFlowDefineJson?: string
+  stepsDefineJson?: string
   zeppelinNoteId?: string
   zeppelinParagraphId?: string
   noteId?: string

Reply via email to