This is an automated email from the ASF dual-hosted git repository.
chufenggao pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 3446fd8ab1 EMR task support replace params placeholder (#15975)
3446fd8ab1 is described below
commit 3446fd8ab157974e31226635e277b6c56b6b7cb5
Author: Wenjun Ruan <[email protected]>
AuthorDate: Fri May 10 17:34:26 2024 +0800
EMR task support replace params placeholder (#15975)
Co-authored-by: Eric Gao <[email protected]>
---
.../apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTask.java | 9 +++++++--
.../apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTask.java | 9 +++++++--
2 files changed, 14 insertions(+), 4 deletions(-)
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTask.java
index 753b206e21..13dc35c30a 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTask.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.plugin.task.emr;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
import java.util.Collections;
import java.util.HashSet;
@@ -126,11 +127,15 @@ public class EmrAddStepsTask extends AbstractEmrTask {
protected AddJobFlowStepsRequest createAddJobFlowStepsRequest() {
final AddJobFlowStepsRequest addJobFlowStepsRequest;
+ String jobStepDefineJson = null;
try {
+ jobStepDefineJson = ParameterUtils.convertParameterPlaceholders(
+ emrParameters.getStepsDefineJson(),
+
ParameterUtils.convert(taskExecutionContext.getPrepareParamsMap()));
addJobFlowStepsRequest =
- objectMapper.readValue(emrParameters.getStepsDefineJson(),
AddJobFlowStepsRequest.class);
+ objectMapper.readValue(jobStepDefineJson,
AddJobFlowStepsRequest.class);
} catch (JsonProcessingException e) {
- throw new EmrTaskException("can not parse AddJobFlowStepsRequest
from json", e);
+ throw new EmrTaskException("can not parse AddJobFlowStepsRequest
from json: " + jobStepDefineJson, e);
}
// When a single task definition is associated with multiple steps,
the state tracking will have high
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTask.java
index f4b0534065..8b772a1118 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTask.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.plugin.task.emr;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
import java.util.Collections;
import java.util.HashSet;
@@ -120,10 +121,14 @@ public class EmrJobFlowTask extends AbstractEmrTask {
protected RunJobFlowRequest createRunJobFlowRequest() {
final RunJobFlowRequest runJobFlowRequest;
+ String jobFlowDefineJson = null;
try {
- runJobFlowRequest =
objectMapper.readValue(emrParameters.getJobFlowDefineJson(),
RunJobFlowRequest.class);
+ jobFlowDefineJson = ParameterUtils.convertParameterPlaceholders(
+ emrParameters.getJobFlowDefineJson(),
+
ParameterUtils.convert(taskExecutionContext.getPrepareParamsMap()));
+ runJobFlowRequest = objectMapper.readValue(jobFlowDefineJson,
RunJobFlowRequest.class);
} catch (JsonProcessingException e) {
- throw new EmrTaskException("can not parse RunJobFlowRequest from
json", e);
+ throw new EmrTaskException("can not parse RunJobFlowRequest from
json: " + jobFlowDefineJson, e);
}
return runJobFlowRequest;