This is an automated email from the ASF dual-hosted git repository.

chufenggao pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 3446fd8ab1 EMR task support replace params placeholder (#15975)
3446fd8ab1 is described below

commit 3446fd8ab157974e31226635e277b6c56b6b7cb5
Author: Wenjun Ruan <[email protected]>
AuthorDate: Fri May 10 17:34:26 2024 +0800

    EMR task support replace params placeholder (#15975)
    
    Co-authored-by: Eric Gao <[email protected]>
---
 .../apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTask.java | 9 +++++++--
 .../apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTask.java  | 9 +++++++--
 2 files changed, 14 insertions(+), 4 deletions(-)

diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTask.java
index 753b206e21..13dc35c30a 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTask.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.plugin.task.emr;
 import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
 import org.apache.dolphinscheduler.plugin.task.api.TaskException;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
 
 import java.util.Collections;
 import java.util.HashSet;
@@ -126,11 +127,15 @@ public class EmrAddStepsTask extends AbstractEmrTask {
     protected AddJobFlowStepsRequest createAddJobFlowStepsRequest() {
 
         final AddJobFlowStepsRequest addJobFlowStepsRequest;
+        String jobStepDefineJson = null;
         try {
+            jobStepDefineJson = ParameterUtils.convertParameterPlaceholders(
+                    emrParameters.getStepsDefineJson(),
+                    
ParameterUtils.convert(taskExecutionContext.getPrepareParamsMap()));
             addJobFlowStepsRequest =
-                    objectMapper.readValue(emrParameters.getStepsDefineJson(), 
AddJobFlowStepsRequest.class);
+                    objectMapper.readValue(jobStepDefineJson, 
AddJobFlowStepsRequest.class);
         } catch (JsonProcessingException e) {
-            throw new EmrTaskException("can not parse AddJobFlowStepsRequest 
from json", e);
+            throw new EmrTaskException("can not parse AddJobFlowStepsRequest 
from json: " + jobStepDefineJson, e);
         }
 
         // When a single task definition is associated with multiple steps, 
the state tracking will have high
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTask.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTask.java
index f4b0534065..8b772a1118 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTask.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTask.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.plugin.task.emr;
 import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
 import org.apache.dolphinscheduler.plugin.task.api.TaskException;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
 
 import java.util.Collections;
 import java.util.HashSet;
@@ -120,10 +121,14 @@ public class EmrJobFlowTask extends AbstractEmrTask {
     protected RunJobFlowRequest createRunJobFlowRequest() {
 
         final RunJobFlowRequest runJobFlowRequest;
+        String jobFlowDefineJson = null;
         try {
-            runJobFlowRequest = 
objectMapper.readValue(emrParameters.getJobFlowDefineJson(), 
RunJobFlowRequest.class);
+            jobFlowDefineJson = ParameterUtils.convertParameterPlaceholders(
+                    emrParameters.getJobFlowDefineJson(),
+                    
ParameterUtils.convert(taskExecutionContext.getPrepareParamsMap()));
+            runJobFlowRequest = objectMapper.readValue(jobFlowDefineJson, 
RunJobFlowRequest.class);
         } catch (JsonProcessingException e) {
-            throw new EmrTaskException("can not parse RunJobFlowRequest from 
json", e);
+            throw new EmrTaskException("can not parse RunJobFlowRequest from 
json: " + jobFlowDefineJson, e);
         }
 
         return runJobFlowRequest;

Reply via email to