This is an automated email from the ASF dual-hosted git repository.
zihaoxiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 0bc817e2b6 [Fix-17710][Master] Fix master task dispatch failure by
filtering out null map keys. (#17711)
0bc817e2b6 is described below
commit 0bc817e2b652dda92f5eb5f950cb7ad8690c1bbf
Author: njnu-seafish <[email protected]>
AuthorDate: Thu Dec 4 11:40:09 2025 +0800
[Fix-17710][Master] Fix master task dispatch failure by filtering out null
map keys. (#17711)
---
.../integration/cases/WorkflowStartTestCase.java | 44 ++
.../it/start/workflow_with_null_key_param.yaml | 108 ++++
.../service/expand/CuringParamsServiceImpl.java | 146 +++---
.../expand/CuringParamsServiceImplTest.java | 582 +++++++++++++++++++++
.../service/expand/CuringParamsServiceTest.java | 273 ----------
.../api/parameters/AbstractParametersTest.java | 84 +++
.../plugin/task/shell/ShellTaskChannelTest.java | 136 +++++
7 files changed, 1038 insertions(+), 335 deletions(-)
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
index 80eddee923..a2f0331f34 100644
---
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
+++
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java
@@ -775,6 +775,50 @@ public class WorkflowStartTestCase extends
AbstractMasterIntegrationTestCase {
masterContainer.assertAllResourceReleased();
}
+ @Test
+ @DisplayName("Test start a workflow which using null key params")
+ public void testStartWorkflow_usingNullKeyParam() {
+ final String yaml = "/it/start/workflow_with_null_key_param.yaml";
+ final WorkflowTestCaseContext context =
workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
+ final WorkflowDefinition workflow = context.getOneWorkflow();
+
+ final RunWorkflowCommandParam runWorkflowCommandParam =
RunWorkflowCommandParam.builder()
+ .commandParams(Lists.newArrayList(Property.builder()
+ .prop(null)
+ .direct(Direct.IN)
+ .type(DataType.VARCHAR)
+ .value("commandParam")
+ .build()))
+ .build();
+
+ final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO =
WorkflowOperator.WorkflowTriggerDTO.builder()
+ .workflowDefinition(workflow)
+ .runWorkflowCommandParam(runWorkflowCommandParam)
+ .build();
+ workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
+
+ await()
+ .atMost(Duration.ofMinutes(1))
+ .untilAsserted(() -> {
+ Assertions
+
.assertThat(repository.queryWorkflowInstance(workflow))
+ .satisfiesExactly(workflowInstance ->
assertThat(workflowInstance.getState())
+
.isEqualTo(WorkflowExecutionStatus.SUCCESS));
+ Assertions
+ .assertThat(repository.queryTaskInstance(workflow))
+ .hasSize(2)
+ .anySatisfy(taskInstance -> {
+
assertThat(taskInstance.getName()).isEqualTo("A");
+
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
+ })
+ .anySatisfy(taskInstance -> {
+
assertThat(taskInstance.getName()).isEqualTo("B");
+
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
+ });
+ });
+ masterContainer.assertAllResourceReleased();
+ }
+
@Test
@DisplayName("Test start a workflow with one fake task(A) failed")
public void testStartWorkflow_with_oneFailedTask() {
diff --git
a/dolphinscheduler-master/src/test/resources/it/start/workflow_with_null_key_param.yaml
b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_null_key_param.yaml
new file mode 100644
index 0000000000..17d3d990aa
--- /dev/null
+++
b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_null_key_param.yaml
@@ -0,0 +1,108 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+project:
+ name: MasterIntegrationTest
+ code: 1
+ description: This is a fake project
+ userId: 1
+ userName: admin
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2024-08-12 00:00:00
+
+workflows:
+ - name: workflow_with_one_fake_task_success
+ code: 1
+ version: 1
+ projectCode: 1
+ description: This is a fake workflow with single task
+ releaseState: ONLINE
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2024-08-12 00:00:00
+ userId: 1
+ globalParams:
'[{"prop":null,"value":"workflowParam","direct":"IN","type":"VARCHAR"}]'
+ executionType: PARALLEL
+
+tasks:
+ - name: A
+ code: 1
+ version: 1
+ projectCode: 1
+ userId: 1
+ taskType: LogicFakeTask
+ taskParams: >
+ {
+ "localParams": [
+ {
+ "prop": "",
+ "direct": "IN",
+ "type": "VARCHAR",
+ "value": ""
+ }
+ ],
+ "shellScript": "echo 111",
+ "resourceList": []
+ }
+ workerGroup: default
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2024-08-12 00:00:00
+ taskExecuteType: BATCH
+ - name: B
+ code: 2
+ version: 1
+ projectCode: 1
+ userId: 1
+ taskType: LogicFakeTask
+ taskParams: >
+ {
+ "localParams": [
+ {
+ "prop": null,
+ "direct": "IN",
+ "type": "VARCHAR",
+ "value": ""
+ }
+ ],
+ "shellScript": "echo 111",
+ "resourceList": []
+ }
+ workerGroup: default
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2024-08-12 00:00:00
+ taskExecuteType: BATCH
+
+
+
+taskRelations:
+ - projectCode: 1
+ workflowDefinitionCode: 1
+ workflowDefinitionVersion: 1
+ preTaskCode: 0
+ preTaskVersion: 0
+ postTaskCode: 1
+ postTaskVersion: 1
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2024-08-12 00:00:00
+ - projectCode: 1
+ workflowDefinitionCode: 1
+ workflowDefinitionVersion: 1
+ preTaskCode: 0
+ preTaskVersion: 0
+ postTaskCode: 2
+ postTaskVersion: 1
+ createTime: 2024-08-12 00:00:00
+ updateTime: 2024-08-12 00:00:00
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/expand/CuringParamsServiceImpl.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/expand/CuringParamsServiceImpl.java
index d98b2cfc4c..225bb06917 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/expand/CuringParamsServiceImpl.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/expand/CuringParamsServiceImpl.java
@@ -48,6 +48,7 @@ import
org.apache.dolphinscheduler.plugin.task.api.utils.MapUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.PropertyUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.VarPoolUtils;
+import org.apache.dolphinscheduler.service.exceptions.ServiceException;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
@@ -55,7 +56,6 @@ import org.apache.commons.lang3.StringUtils;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -66,10 +66,12 @@ import java.util.stream.Collectors;
import javax.annotation.Nullable;
import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
+@Slf4j
@Component
public class CuringParamsServiceImpl implements CuringParamsService {
@@ -160,7 +162,13 @@ public class CuringParamsServiceImpl implements
CuringParamsService {
}
/**
- * Generate prepare params include project params, global parameters,
local parameters, built-in parameters, varpool, start-up params.
+ * Prepares the final map of task execution parameters by merging
parameters from multiple sources
+ * in a well-defined priority order. The resulting map is guaranteed to
contain only valid entries:
+ * <ul>
+ * <li>Keys are non-null and non-blank strings</li>
+ * <li>Values are non-null {@link Property} objects</li>
+ * </ul>
+ *
* <p> The priority of the parameters is as follows:
* <p> varpool > command parameters > local parameters > global parameters
> project parameters > built-in parameters
* todo: Use TaskRuntimeParams to represent this.
@@ -180,87 +188,101 @@ public class CuringParamsServiceImpl implements
CuringParamsService {
String
workflowDefinitionName) {
Map<String, Property> prepareParamsMap = new HashMap<>();
- // assign value to definedParams here
- Map<String, Property> globalParams =
parseGlobalParamsMap(workflowInstance);
-
- // combining local and global parameters
- Map<String, Property> localParams =
parameters.getInputLocalParametersMap();
-
- // stream pass params
- List<Property> varPools = parseVarPool(taskInstance);
-
- // if it is a complement,
- // you need to pass in the task instance id to locate the time
- // of the process instance complement
+ // If it is a complement, you need to pass in the task instance id
+ // to locate the time of the process instance complement.
ICommandParam commandParam =
JSONUtils.parseObject(workflowInstance.getCommandParam(), ICommandParam.class);
+ if (commandParam == null) {
+ throw new ServiceException(String.format("Failed to parse command
parameter for workflow instance %s",
+ workflowInstance.getId()));
+ }
String timeZone = commandParam.getTimeZone();
- // built-in params
- Map<String, String> builtInParams =
- setBuiltInParamsMap(taskInstance, workflowInstance, timeZone,
projectName, workflowDefinitionName);
+ // 1. Built-in parameters (lowest precedence)
+ Map<String, String> builtInParams = setBuiltInParamsMap(
+ taskInstance, workflowInstance, timeZone, projectName,
workflowDefinitionName);
+ safePutAll(prepareParamsMap,
ParameterUtils.getUserDefParamsMap(builtInParams));
- // project-level params
+ // 2. Project-level parameters
Map<String, Property> projectParams =
getProjectParameterMap(taskInstance.getProjectCode());
+ safePutAll(prepareParamsMap, projectParams);
- if (MapUtils.isNotEmpty(builtInParams)) {
-
prepareParamsMap.putAll(ParameterUtils.getUserDefParamsMap(builtInParams));
- }
-
- if (MapUtils.isNotEmpty(projectParams)) {
- prepareParamsMap.putAll(projectParams);
- }
-
- if (MapUtils.isNotEmpty(globalParams)) {
- prepareParamsMap.putAll(globalParams);
- }
+ // 3. Workflow global parameters
+ Map<String, Property> globalParams =
parseGlobalParamsMap(workflowInstance);
+ safePutAll(prepareParamsMap, globalParams);
- if (MapUtils.isNotEmpty(localParams)) {
- prepareParamsMap.putAll(localParams);
- }
+ // 4. Task-local parameters
+ Map<String, Property> localParams =
parameters.getInputLocalParametersMap();
+ safePutAll(prepareParamsMap, localParams);
+ // 5. Command-line / complement parameters
if (CollectionUtils.isNotEmpty(commandParam.getCommandParams())) {
- prepareParamsMap.putAll(commandParam.getCommandParams().stream()
- .collect(Collectors.toMap(Property::getProp,
Function.identity())));
+ Map<String, Property> commandParamsMap =
commandParam.getCommandParams().stream()
+ .filter(prop -> StringUtils.isNotBlank(prop.getProp()))
+ .collect(Collectors.toMap(
+ Property::getProp,
+ Function.identity(),
+ (v1, v2) -> v2 // retain last on duplicate key
+ ));
+ safePutAll(prepareParamsMap, commandParamsMap);
}
+ // 6. VarPool: override values only for existing IN-direction
parameters
+ List<Property> varPools = parseVarPool(taskInstance);
if (CollectionUtils.isNotEmpty(varPools)) {
- // overwrite the in parameter by varPool
for (Property varPool : varPools) {
- Property property = prepareParamsMap.get(varPool.getProp());
- if (property == null || property.getDirect() != Direct.IN) {
+ if (StringUtils.isBlank(varPool.getProp())) {
continue;
}
- property.setValue(varPool.getValue());
+ Property targetParam = prepareParamsMap.get(varPool.getProp());
+ if (targetParam != null &&
Direct.IN.equals(targetParam.getDirect())) {
+ targetParam.setValue(varPool.getValue());
+ }
}
}
- Iterator<Map.Entry<String, Property>> iter =
prepareParamsMap.entrySet().iterator();
- while (iter.hasNext()) {
- Map.Entry<String, Property> en = iter.next();
- Property property = en.getValue();
-
- if (StringUtils.isNotEmpty(property.getValue())
- &&
property.getValue().contains(Constants.FUNCTION_START_WITH)) {
- /**
- * local parameter refers to global parameter with the same
name
- * note: the global parameters of the process instance here
are solidified parameters,
- * and there are no variables in them.
- */
- String val = property.getValue();
-
- // handle some chain parameter assign, such as `{"var1":
"${var2}", "var2": 1}` should be convert to
- // `{"var1": 1, "var2": 1}`
- val = convertParameterPlaceholders(val, prepareParamsMap);
- property.setValue(val);
- }
+ // 7. Inject business/scheduling parameters (e.g., ${datetime}), which
may contain or reference placeholders
+ Map<String, Property> businessParams =
preBuildBusinessParams(workflowInstance);
+ safePutAll(prepareParamsMap, businessParams);
+
+ // 8. Resolve all placeholders (e.g., "${output_dir}") using the
current parameter context
+ resolvePlaceholders(prepareParamsMap);
+
+ return prepareParamsMap;
+ }
+
+ /**
+ * Safely merges entries from the {@code source} map into the {@code
target} map,
+ * skipping any entry with a {@code null}, empty, or blank key, or a
{@code null} value.
+ *
+ * @param target the destination map to merge into (must not be null)
+ * @param source the source map whose valid entries will be copied (may be
null or empty)
+ */
+ private void safePutAll(Map<String, Property> target, Map<String,
Property> source) {
+ if (MapUtils.isEmpty(source)) {
+ return;
}
+ source.forEach((key, value) -> {
+ if (StringUtils.isNotBlank(key) && value != null) {
+ target.put(key, value);
+ } else {
+ log.warn("Skipped invalid parameter entry: key='{}',
value={}", key, value);
+ }
+ });
+ }
- // put schedule time param to params map
- Map<String, Property> paramsMap =
preBuildBusinessParams(workflowInstance);
- if (MapUtils.isNotEmpty(paramsMap)) {
- prepareParamsMap.putAll(paramsMap);
+ /**
+ * Resolves placeholder expressions (e.g., "${var}") in parameter values
by substituting them
+ * with actual values from the current {@code paramsMap}.
+ *
+ * @param paramsMap the map of parameters (key: parameter name, value:
{@link Property}) to resolve
+ */
+ private void resolvePlaceholders(Map<String, Property> paramsMap) {
+ for (Property prop : paramsMap.values()) {
+ String val = prop.getValue();
+ if (StringUtils.isNotEmpty(val) &&
val.contains(Constants.FUNCTION_START_WITH)) {
+ prop.setValue(convertParameterPlaceholders(val, paramsMap));
+ }
}
- return prepareParamsMap;
}
/**
diff --git
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/expand/CuringParamsServiceImplTest.java
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/expand/CuringParamsServiceImplTest.java
new file mode 100644
index 0000000000..8d63421988
--- /dev/null
+++
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/expand/CuringParamsServiceImplTest.java
@@ -0,0 +1,582 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.service.expand;
+
+import org.apache.dolphinscheduler.common.constants.DateConstants;
+import org.apache.dolphinscheduler.common.enums.CommandType;
+import org.apache.dolphinscheduler.common.utils.DateUtils;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.dao.entity.Project;
+import org.apache.dolphinscheduler.dao.entity.ProjectParameter;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
+import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
+import org.apache.dolphinscheduler.dao.mapper.ProjectParameterMapper;
+import
org.apache.dolphinscheduler.extract.master.command.BackfillWorkflowCommandParam;
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
+import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
+import
org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import
org.apache.dolphinscheduler.plugin.task.api.parameters.SubWorkflowParameters;
+
+import org.apache.commons.collections4.MapUtils;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import com.google.common.collect.Lists;
+import com.google.common.truth.Truth;
+
+@ExtendWith(MockitoExtension.class)
+public class CuringParamsServiceImplTest {
+
+ private static final String YESTERDAY_DATE_PLACEHOLDER = "$[yyyy-MM-dd-1]";
+
+ @Mock
+ private CuringParamsService curingParamsService;
+
+ @InjectMocks
+ private CuringParamsServiceImpl curingParamsServiceImpl;
+
+ @Mock
+ private ProjectParameterMapper projectParameterMapper;
+
+ private final Map<String, Property> paramMap = new HashMap<>();
+
+ @BeforeEach
+ public void init() {
+ paramMap.put("globalParams1", new Property("globalParams1", Direct.IN,
DataType.VARCHAR, "Params1"));
+ }
+
+ @Test
+ public void testConvertParameterPlaceholders() {
+
Mockito.when(curingParamsService.convertParameterPlaceholders(YESTERDAY_DATE_PLACEHOLDER,
paramMap))
+ .thenReturn("2022-06-26");
+ String result =
curingParamsService.convertParameterPlaceholders(YESTERDAY_DATE_PLACEHOLDER,
paramMap);
+ Assertions.assertNotNull(result);
+ }
+
+ @Test
+ public void testCuringGlobalParams() {
+ // define globalMap
+ Map<String, String> globalParamMap = new HashMap<>();
+ globalParamMap.put("globalParams1", "Params1");
+
+ // define globalParamList
+ List<Property> globalParamList = new ArrayList<>();
+
+ // define scheduleTime
+ Date scheduleTime = DateUtils.stringToDate("2019-12-20 00:00:00");
+
+ // test globalParamList is null
+ String result = curingParamsServiceImpl.curingGlobalParams(1,
globalParamMap, globalParamList,
+ CommandType.START_CURRENT_TASK_PROCESS, scheduleTime, null);
+ Assertions.assertNull(result);
+ Assertions.assertNull(curingParamsServiceImpl.curingGlobalParams(1,
null, null,
+ CommandType.START_CURRENT_TASK_PROCESS, null, null));
+ Assertions.assertNull(curingParamsServiceImpl.curingGlobalParams(1,
globalParamMap, null,
+ CommandType.START_CURRENT_TASK_PROCESS, scheduleTime, null));
+
+ // test globalParamList is not null
+ Property property = new Property("testGlobalParam", Direct.IN,
DataType.VARCHAR, "testGlobalParam");
+ globalParamList.add(property);
+
+ String result2 = curingParamsServiceImpl.curingGlobalParams(1, null,
globalParamList,
+ CommandType.START_CURRENT_TASK_PROCESS, scheduleTime, null);
+ Assertions.assertEquals(result2,
JSONUtils.toJsonString(globalParamList));
+
+ String result3 = curingParamsServiceImpl.curingGlobalParams(1,
globalParamMap, globalParamList,
+ CommandType.START_CURRENT_TASK_PROCESS, null, null);
+ Assertions.assertEquals(result3,
JSONUtils.toJsonString(globalParamList));
+
+ String result4 = curingParamsServiceImpl.curingGlobalParams(1,
globalParamMap, globalParamList,
+ CommandType.START_CURRENT_TASK_PROCESS, scheduleTime, null);
+ Assertions.assertEquals(result4,
JSONUtils.toJsonString(globalParamList));
+
+ // test var $ startsWith
+ globalParamMap.put("bizDate", "${system.biz.date}");
+ globalParamMap.put("b1zCurdate", "${system.biz.curdate}");
+
+ Property property2 = new Property("testParamList1", Direct.IN,
DataType.VARCHAR, "testParamList");
+ Property property3 = new Property("testParamList2", Direct.IN,
DataType.VARCHAR, "{testParamList1}");
+ Property property4 = new Property("testParamList3", Direct.IN,
DataType.VARCHAR, "${b1zCurdate}");
+
+ globalParamList.add(property2);
+ globalParamList.add(property3);
+ globalParamList.add(property4);
+
+ String result5 = curingParamsServiceImpl.curingGlobalParams(1,
globalParamMap, globalParamList,
+ CommandType.START_CURRENT_TASK_PROCESS, scheduleTime, null);
+ Assertions.assertEquals(result5,
JSONUtils.toJsonString(globalParamList));
+
+ Property testStartParamProperty = new Property("testStartParam",
Direct.IN, DataType.VARCHAR, "");
+ globalParamList.add(testStartParamProperty);
+ Property testStartParam2Property =
+ new Property("testStartParam2", Direct.IN, DataType.VARCHAR,
"$[yyyy-MM-dd+1]");
+ globalParamList.add(testStartParam2Property);
+ globalParamMap.put("testStartParam", "");
+ globalParamMap.put("testStartParam2", "$[yyyy-MM-dd+1]");
+
+ Map<String, String> startParamMap = new HashMap<>(2);
+ startParamMap.put("testStartParam", "$[yyyyMMdd]");
+
+ for (Map.Entry<String, String> param : globalParamMap.entrySet()) {
+ String val = startParamMap.get(param.getKey());
+ if (val != null) {
+ param.setValue(val);
+ }
+ }
+
+ String result6 = curingParamsServiceImpl.curingGlobalParams(1,
globalParamMap, globalParamList,
+ CommandType.START_CURRENT_TASK_PROCESS, scheduleTime, null);
+ Assertions.assertEquals(result6,
JSONUtils.toJsonString(globalParamList));
+ }
+
+ @Test
+ public void testParamParsingPreparation() {
+ TaskInstance taskInstance = new TaskInstance();
+ taskInstance.setId(1);
+ taskInstance.setExecutePath("home/path/execute");
+
+ TaskDefinition taskDefinition = new TaskDefinition();
+ taskDefinition.setName("TaskName-1");
+ taskDefinition.setCode(1000001L);
+
+ WorkflowInstance workflowInstance = new WorkflowInstance();
+ workflowInstance.setId(2);
+ final BackfillWorkflowCommandParam backfillWorkflowCommandParam =
BackfillWorkflowCommandParam.builder()
+ .timeZone("Asia/Shanghai")
+ .build();
+
workflowInstance.setCommandParam(JSONUtils.toJsonString(backfillWorkflowCommandParam));
+ workflowInstance.setHistoryCmd(CommandType.COMPLEMENT_DATA.toString());
+ Property property = new Property();
+ property.setDirect(Direct.IN);
+ property.setProp("global_params");
+ property.setValue("hello world");
+ property.setType(DataType.VARCHAR);
+ List<Property> properties = Lists.newArrayList(property);
+ workflowInstance.setGlobalParams(JSONUtils.toJsonString(properties));
+
+ WorkflowDefinition workflowDefinition = new WorkflowDefinition();
+ workflowDefinition.setName("ProcessName-1");
+ workflowDefinition.setProjectName("ProjectName");
+ workflowDefinition.setProjectCode(3000001L);
+ workflowDefinition.setCode(200001L);
+
+ Project project = new Project();
+ project.setName("ProjectName");
+ project.setCode(3000001L);
+
+
workflowInstance.setWorkflowDefinitionCode(workflowDefinition.getCode());
+ workflowInstance.setProjectCode(workflowDefinition.getProjectCode());
+ taskInstance.setTaskCode(taskDefinition.getCode());
+ taskInstance.setTaskDefinitionVersion(taskDefinition.getVersion());
+ taskInstance.setProjectCode(workflowDefinition.getProjectCode());
+ taskInstance.setWorkflowInstanceId(workflowInstance.getId());
+
+ AbstractParameters parameters = new SubWorkflowParameters();
+
+
Mockito.when(projectParameterMapper.queryByProjectCode(Mockito.anyLong())).thenReturn(Collections.emptyList());
+
+ Map<String, Property> propertyMap =
+ curingParamsServiceImpl.paramParsingPreparation(taskInstance,
parameters, workflowInstance,
+ project.getName(), workflowDefinition.getName());
+ Assertions.assertNotNull(propertyMap);
+
Assertions.assertEquals(propertyMap.get(TaskConstants.PARAMETER_TASK_INSTANCE_ID).getValue(),
+ String.valueOf(taskInstance.getId()));
+
Assertions.assertEquals(propertyMap.get(TaskConstants.PARAMETER_TASK_EXECUTE_PATH).getValue(),
+ taskInstance.getExecutePath());
+
Assertions.assertEquals(propertyMap.get(TaskConstants.PARAMETER_WORKFLOW_INSTANCE_ID).getValue(),
+ String.valueOf(workflowInstance.getId()));
+
Assertions.assertEquals(propertyMap.get(TaskConstants.PARAMETER_PROJECT_NAME).getValue(),
+ workflowDefinition.getProjectName());
+
Assertions.assertEquals(propertyMap.get(TaskConstants.PARAMETER_PROJECT_CODE).getValue(),
+ String.valueOf(workflowDefinition.getProjectCode()));
+
Assertions.assertEquals(propertyMap.get(TaskConstants.PARAMETER_TASK_DEFINITION_CODE).getValue(),
+ String.valueOf(taskDefinition.getCode()));
+
Assertions.assertEquals(propertyMap.get(TaskConstants.PARAMETER_WORKFLOW_DEFINITION_CODE).getValue(),
+ String.valueOf(workflowDefinition.getCode()));
+ }
+
+ @Test
+ public void testParseWorkflowStartParam() {
+ Map<String, Property> result;
+ // empty cmd param
+ Map<String, String> startParamMap = new HashMap<>();
+ result =
curingParamsServiceImpl.parseWorkflowStartParam(startParamMap);
+ Assertions.assertTrue(MapUtils.isEmpty(result));
+
+ // without key
+ startParamMap.put("testStartParam", "$[yyyyMMdd]");
+ result =
curingParamsServiceImpl.parseWorkflowStartParam(startParamMap);
+ Assertions.assertTrue(MapUtils.isEmpty(result));
+
+ startParamMap.put("StartParams", "{\"param1\":\"11111\",
\"param2\":\"22222\"}");
+ result =
curingParamsServiceImpl.parseWorkflowStartParam(startParamMap);
+ Assertions.assertTrue(MapUtils.isNotEmpty(result));
+ Assertions.assertEquals(2, result.keySet().size());
+ Assertions.assertEquals("11111", result.get("param1").getValue());
+ Assertions.assertEquals("22222", result.get("param2").getValue());
+ }
+
+ @Test
+ public void testParseWorkflowFatherParam() {
+ Map<String, Property> result;
+ // empty cmd param
+ Map<String, String> startParamMap = new HashMap<>();
+ result =
curingParamsServiceImpl.parseWorkflowFatherParam(startParamMap);
+ Assertions.assertTrue(MapUtils.isEmpty(result));
+
+ // without key
+ startParamMap.put("testfatherParams", "$[yyyyMMdd]");
+ result =
curingParamsServiceImpl.parseWorkflowFatherParam(startParamMap);
+ Assertions.assertTrue(MapUtils.isEmpty(result));
+
+ startParamMap.put("fatherParams", "{\"param1\":\"11111\",
\"param2\":\"22222\"}");
+ result =
curingParamsServiceImpl.parseWorkflowFatherParam(startParamMap);
+ Assertions.assertTrue(MapUtils.isNotEmpty(result));
+ Assertions.assertEquals(2, result.keySet().size());
+ Assertions.assertEquals("11111", result.get("param1").getValue());
+ Assertions.assertEquals("22222", result.get("param2").getValue());
+ }
+
+ @Test
+ public void testParseGlobalParamsMap() throws Exception {
+ WorkflowInstance workflowInstance = new WorkflowInstance();
+ workflowInstance.setGlobalParams(
+
"[{\"prop\":\"param1\",\"value\":\"11111\"},{\"prop\":\"param2\",\"value\":\"22222\"}]");
+
+ Method method =
CuringParamsServiceImpl.class.getDeclaredMethod("parseGlobalParamsMap",
WorkflowInstance.class);
+ method.setAccessible(true);
+
+ @SuppressWarnings("unchecked")
+ Map<String, Property> result = (Map<String, Property>)
method.invoke(curingParamsServiceImpl, workflowInstance);
+
+ Assertions.assertTrue(MapUtils.isNotEmpty(result));
+ Assertions.assertEquals(2, result.keySet().size());
+ Assertions.assertEquals("11111", result.get("param1").getValue());
+ Assertions.assertEquals("22222", result.get("param2").getValue());
+ }
+
+ @Test
+ public void testParseGlobalParamsMap_whenGlobalParamsIsNull() throws
Exception {
+ WorkflowInstance workflowInstance = new WorkflowInstance();
+ workflowInstance.setGlobalParams(null);
+
+ Method method =
CuringParamsServiceImpl.class.getDeclaredMethod("parseGlobalParamsMap",
WorkflowInstance.class);
+ method.setAccessible(true);
+
+ @SuppressWarnings("unchecked")
+ Map<String, Property> result = (Map<String, Property>)
method.invoke(curingParamsServiceImpl, workflowInstance);
+
+ Assertions.assertTrue(MapUtils.isEmpty(result));
+ }
+
+ @Test
+ public void testParseGlobalParamsMap_whenGlobalParamsIsEmpty() throws
Exception {
+ WorkflowInstance workflowInstance = new WorkflowInstance();
+ workflowInstance.setGlobalParams("");
+
+ Method method =
CuringParamsServiceImpl.class.getDeclaredMethod("parseGlobalParamsMap",
WorkflowInstance.class);
+ method.setAccessible(true);
+
+ @SuppressWarnings("unchecked")
+ Map<String, Property> result = (Map<String, Property>)
method.invoke(curingParamsServiceImpl, workflowInstance);
+
+ Assertions.assertTrue(MapUtils.isEmpty(result));
+ }
+
+ @Test
+ public void testParseGlobalParamsMap_withNullProp() throws Exception {
+ WorkflowInstance workflowInstance = new WorkflowInstance();
+
workflowInstance.setGlobalParams("[{\"prop\":null,\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"\"}]");
+
+ Method method =
CuringParamsServiceImpl.class.getDeclaredMethod("parseGlobalParamsMap",
WorkflowInstance.class);
+ method.setAccessible(true);
+
+ @SuppressWarnings("unchecked")
+ Map<String, Property> result = (Map<String, Property>)
method.invoke(curingParamsServiceImpl, workflowInstance);
+
+ // The current implementation will include a null key
+ Assertions.assertTrue(MapUtils.isNotEmpty(result));
+ Assertions.assertEquals(1, result.size());
+ Assertions.assertTrue(result.containsKey(null));
+ Assertions.assertEquals("", result.get(null).getValue());
+ }
+
+ @Test
+ public void testParseVarPool_withValidVarPool() throws Exception {
+ TaskInstance taskInstance = new TaskInstance();
+ taskInstance.setVarPool("[{\"prop\":\"var1\",\"value\":\"val1\"}]");
+
+ Method method =
CuringParamsServiceImpl.class.getDeclaredMethod("parseVarPool",
TaskInstance.class);
+ method.setAccessible(true);
+
+ @SuppressWarnings("unchecked")
+ List<Property> result = (List<Property>)
method.invoke(curingParamsServiceImpl, taskInstance);
+
+ Assertions.assertFalse(result.isEmpty());
+ Assertions.assertEquals(1, result.size());
+ Assertions.assertEquals("var1", result.get(0).getProp());
+ Assertions.assertEquals("val1", result.get(0).getValue());
+ }
+
+ @Test
+ public void testParseVarPool_withNullVarPool() throws Exception {
+ TaskInstance taskInstance = new TaskInstance();
+ taskInstance.setVarPool(null);
+
+ Method method =
CuringParamsServiceImpl.class.getDeclaredMethod("parseVarPool",
TaskInstance.class);
+ method.setAccessible(true);
+
+ @SuppressWarnings("unchecked")
+ List<Property> result = (List<Property>)
method.invoke(curingParamsServiceImpl, taskInstance);
+
+ Assertions.assertTrue(result.isEmpty());
+ }
+
+ @Test
+ public void testParseVarPool_withEmptyVarPool() throws Exception {
+ TaskInstance taskInstance = new TaskInstance();
+ taskInstance.setVarPool("");
+
+ Method method =
CuringParamsServiceImpl.class.getDeclaredMethod("parseVarPool",
TaskInstance.class);
+ method.setAccessible(true);
+
+ @SuppressWarnings("unchecked")
+ List<Property> result = (List<Property>)
method.invoke(curingParamsServiceImpl, taskInstance);
+
+ Assertions.assertTrue(result.isEmpty());
+ }
+
+ @Test
+ public void testParseVarPool_withBlankVarPool_throwsException() throws
NoSuchMethodException {
+ TaskInstance taskInstance = new TaskInstance();
+ taskInstance.setVarPool(" ");
+
+ Method method =
CuringParamsServiceImpl.class.getDeclaredMethod("parseVarPool",
TaskInstance.class);
+ method.setAccessible(true);
+
+ InvocationTargetException exception =
Assertions.assertThrows(InvocationTargetException.class,
+ () -> method.invoke(curingParamsServiceImpl, taskInstance));
+
+ // Check the root cause
+
Truth.assertThat(exception.getCause()).isInstanceOf(IllegalArgumentException.class);
+ Truth.assertThat(exception.getCause().getMessage()).contains("Parse
json");
+ }
+
+ @Test
+ public void testPreBuildBusinessParams_withScheduleTime() {
+ // 1234567890 ms since epoch = 1970-01-15T06:56:07Z
+ WorkflowInstance workflowInstance = new WorkflowInstance();
+ workflowInstance.setScheduleTime(new Date(1234567890L));
+
+ Map<String, Property> result =
curingParamsServiceImpl.preBuildBusinessParams(workflowInstance);
+
+ Assertions.assertTrue(MapUtils.isNotEmpty(result));
+ Assertions.assertEquals(1, result.size());
+
Assertions.assertTrue(result.containsKey(DateConstants.PARAMETER_DATETIME));
+ // Expect UTC time string
+ Assertions.assertEquals("19700115065607",
result.get(DateConstants.PARAMETER_DATETIME).getValue());
+ }
+
+ @Test
+ public void testPreBuildBusinessParams_withoutScheduleTime() {
+ WorkflowInstance workflowInstance = new WorkflowInstance();
+ workflowInstance.setScheduleTime(null);
+
+ Map<String, Property> result =
curingParamsServiceImpl.preBuildBusinessParams(workflowInstance);
+
+ Assertions.assertTrue(MapUtils.isEmpty(result));
+ }
+
+ @Test
+ public void testGetProjectParameterMap_withParameters() {
+ long projectCode = 123456L;
+
+ ProjectParameter param1 = new ProjectParameter();
+ param1.setParamName("env");
+ param1.setParamValue("prod");
+ param1.setParamDataType("VARCHAR");
+
+ ProjectParameter param2 = new ProjectParameter();
+ param2.setParamName("timeout");
+ param2.setParamValue("30");
+ param2.setParamDataType("INTEGER");
+
+ List<ProjectParameter> mockList = Arrays.asList(param1, param2);
+
Mockito.when(projectParameterMapper.queryByProjectCode(projectCode)).thenReturn(mockList);
+
+ Map<String, Property> result =
curingParamsServiceImpl.getProjectParameterMap(projectCode);
+
+ Assertions.assertTrue(MapUtils.isNotEmpty(result));
+ Assertions.assertEquals(2, result.size());
+ Assertions.assertTrue(result.containsKey("env"));
+ Assertions.assertTrue(result.containsKey("timeout"));
+
+ Property envProp = result.get("env");
+ Assertions.assertEquals("prod", envProp.getValue());
+ Assertions.assertEquals(Direct.IN, envProp.getDirect());
+ Assertions.assertEquals(DataType.VARCHAR, envProp.getType());
+
+ Property timeoutProp = result.get("timeout");
+ Assertions.assertEquals("30", timeoutProp.getValue());
+ Assertions.assertEquals(DataType.INTEGER, timeoutProp.getType());
+ }
+
+ @Test
+ public void testGetProjectParameterMap_withNullParamName() {
+ long projectCode = 123456L;
+
+ ProjectParameter param = new ProjectParameter();
+ param.setParamName(null); // ← null paramName
+ param.setParamValue("test-value");
+ param.setParamDataType("VARCHAR");
+
+ List<ProjectParameter> mockList = Collections.singletonList(param);
+
Mockito.when(projectParameterMapper.queryByProjectCode(projectCode)).thenReturn(mockList);
+
+ Map<String, Property> result =
curingParamsServiceImpl.getProjectParameterMap(projectCode);
+
+ Assertions.assertTrue(MapUtils.isNotEmpty(result));
+ Assertions.assertEquals(1, result.size());
+ Assertions.assertTrue(result.containsKey(null)); // null key is present
+ Property prop = result.get(null);
+ Assertions.assertEquals("test-value", prop.getValue());
+ Assertions.assertEquals(DataType.VARCHAR, prop.getType());
+ }
+
+ @Test
+ public void testGetProjectParameterMap_withNoParameters() {
+ long projectCode = 999L;
+
Mockito.when(projectParameterMapper.queryByProjectCode(projectCode)).thenReturn(Collections.emptyList());
+
+ Map<String, Property> result =
curingParamsServiceImpl.getProjectParameterMap(projectCode);
+
+ Assertions.assertTrue(MapUtils.isEmpty(result));
+ }
+
+ @Test
+ public void testSafePutAll() throws Exception {
+ // Arrange
+ Map<String, Property> target = new HashMap<>();
+ Map<String, Property> source = new HashMap<>();
+
+ Property prop1 = new Property();
+ prop1.setProp("validKey");
+ prop1.setValue("validValue");
+
+ Property prop2 = new Property();
+ prop2.setProp(""); // invalid: blank key
+ prop2.setValue("shouldBeSkipped");
+
+ Property prop3 = new Property();
+ prop3.setProp("anotherValid");
+ prop3.setValue("anotherValue");
+
+ source.put("validKey", prop1);
+ source.put("", prop2); // blank key → should be skipped
+ source.put("anotherValid", prop3);
+ source.put(null, prop1); // null key → should be skipped
+ source.put("nullValueKey", null); // null value → should be skipped
+
+ // Get private method
+ Method method = CuringParamsServiceImpl.class.getDeclaredMethod(
+ "safePutAll",
+ Map.class,
+ Map.class);
+ method.setAccessible(true);
+
+ // Act
+ method.invoke(curingParamsServiceImpl, target, source);
+
+ // Assert
+ Assertions.assertEquals(2, target.size());
+ Assertions.assertTrue(target.containsKey("validKey"));
+ Assertions.assertTrue(target.containsKey("anotherValid"));
+ Assertions.assertEquals("validValue",
target.get("validKey").getValue());
+ Assertions.assertEquals("anotherValue",
target.get("anotherValid").getValue());
+
+ // Ensure invalid entries were NOT added
+ Assertions.assertFalse(target.containsKey(""));
+ Assertions.assertFalse(target.containsKey(null));
+ }
+
+ @Test
+ public void testResolvePlaceholders() throws Exception {
+ // Arrange: prepare a paramsMap with placeholders and references
+ Map<String, Property> paramsMap = new HashMap<>();
+
+ Property p1 = new Property();
+ p1.setProp("name");
+ p1.setValue("Alice");
+
+ Property p2 = new Property();
+ p2.setProp("greeting");
+ p2.setValue("Hello, ${name}!"); // contains placeholder
+
+ Property p3 = new Property();
+ p3.setProp("farewell");
+ p3.setValue("${greeting} Goodbye."); // chained reference
+
+ Property p4 = new Property();
+ p4.setProp("static");
+ p4.setValue("no placeholder"); // should remain unchanged
+
+ paramsMap.put("name", p1);
+ paramsMap.put("greeting", p2);
+ paramsMap.put("farewell", p3);
+ paramsMap.put("static", p4);
+
+ // Get the private method via reflection
+ Method method = CuringParamsServiceImpl.class.getDeclaredMethod(
+ "resolvePlaceholders",
+ Map.class);
+ method.setAccessible(true);
+
+ // Act: invoke the private method
+ method.invoke(curingParamsServiceImpl, paramsMap);
+
+ // Assert: check that placeholders were resolved correctly
+ Assertions.assertEquals("Alice", paramsMap.get("name").getValue()); //
unchanged
+ Assertions.assertEquals("Hello, Alice!",
paramsMap.get("greeting").getValue());
+ Assertions.assertEquals("Hello, Alice! Goodbye.",
paramsMap.get("farewell").getValue());
+ Assertions.assertEquals("no placeholder",
paramsMap.get("static").getValue());
+
+ // Ensure no unintended side effects
+ Assertions.assertEquals(4, paramsMap.size());
+ }
+}
diff --git
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/expand/CuringParamsServiceTest.java
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/expand/CuringParamsServiceTest.java
deleted file mode 100644
index f0f2874a9d..0000000000
---
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/expand/CuringParamsServiceTest.java
+++ /dev/null
@@ -1,273 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.service.expand;
-
-import org.apache.dolphinscheduler.common.enums.CommandType;
-import org.apache.dolphinscheduler.common.utils.DateUtils;
-import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.dao.entity.Project;
-import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
-import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
-import org.apache.dolphinscheduler.dao.mapper.ProjectParameterMapper;
-import
org.apache.dolphinscheduler.extract.master.command.BackfillWorkflowCommandParam;
-import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
-import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
-import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
-import org.apache.dolphinscheduler.plugin.task.api.model.Property;
-import
org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
-import
org.apache.dolphinscheduler.plugin.task.api.parameters.SubWorkflowParameters;
-
-import org.apache.commons.collections4.MapUtils;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.InjectMocks;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.junit.jupiter.MockitoExtension;
-
-import com.google.common.collect.Lists;
-
-@ExtendWith(MockitoExtension.class)
-public class CuringParamsServiceTest {
-
- private static final String placeHolderName = "$[yyyy-MM-dd-1]";
-
- @Mock
- private CuringParamsService curingGlobalParamsService;
-
- @InjectMocks
- private CuringParamsServiceImpl dolphinSchedulerCuringGlobalParams;
-
- @Mock
- private ProjectParameterMapper projectParameterMapper;
-
- private final Map<String, String> globalParamMap = new HashMap<>();
- private final Map<String, Property> paramMap = new HashMap<>();
-
- @BeforeEach
- public void init() {
- globalParamMap.put("globalParams1", "Params1");
- paramMap.put("globalParams1", new Property("globalParams1", Direct.IN,
DataType.VARCHAR, "Params1"));
- }
-
- @Test
- public void testConvertParameterPlaceholders() {
-
Mockito.when(curingGlobalParamsService.convertParameterPlaceholders(placeHolderName,
paramMap))
- .thenReturn("2022-06-26");
- String result =
curingGlobalParamsService.convertParameterPlaceholders(placeHolderName,
paramMap);
- Assertions.assertNotNull(result);
- }
-
- @Test
- public void testCuringGlobalParams() {
- // define globalMap
- Map<String, String> globalParamMap = new HashMap<>();
- globalParamMap.put("globalParams1", "Params1");
-
- // define globalParamList
- List<Property> globalParamList = new ArrayList<>();
-
- // define scheduleTime
- Date scheduleTime = DateUtils.stringToDate("2019-12-20 00:00:00");
-
- // test globalParamList is null
- String result =
dolphinSchedulerCuringGlobalParams.curingGlobalParams(1, globalParamMap,
globalParamList,
- CommandType.START_CURRENT_TASK_PROCESS, scheduleTime, null);
- Assertions.assertNull(result);
-
Assertions.assertNull(dolphinSchedulerCuringGlobalParams.curingGlobalParams(1,
null, null,
- CommandType.START_CURRENT_TASK_PROCESS, null, null));
-
Assertions.assertNull(dolphinSchedulerCuringGlobalParams.curingGlobalParams(1,
globalParamMap, null,
- CommandType.START_CURRENT_TASK_PROCESS, scheduleTime, null));
-
- // test globalParamList is not null
- Property property = new Property("testGlobalParam", Direct.IN,
DataType.VARCHAR, "testGlobalParam");
- globalParamList.add(property);
-
- String result2 =
dolphinSchedulerCuringGlobalParams.curingGlobalParams(1, null, globalParamList,
- CommandType.START_CURRENT_TASK_PROCESS, scheduleTime, null);
- Assertions.assertEquals(result2,
JSONUtils.toJsonString(globalParamList));
-
- String result3 =
dolphinSchedulerCuringGlobalParams.curingGlobalParams(1, globalParamMap,
globalParamList,
- CommandType.START_CURRENT_TASK_PROCESS, null, null);
- Assertions.assertEquals(result3,
JSONUtils.toJsonString(globalParamList));
-
- String result4 =
dolphinSchedulerCuringGlobalParams.curingGlobalParams(1, globalParamMap,
globalParamList,
- CommandType.START_CURRENT_TASK_PROCESS, scheduleTime, null);
- Assertions.assertEquals(result4,
JSONUtils.toJsonString(globalParamList));
-
- // test var $ startsWith
- globalParamMap.put("bizDate", "${system.biz.date}");
- globalParamMap.put("b1zCurdate", "${system.biz.curdate}");
-
- Property property2 = new Property("testParamList1", Direct.IN,
DataType.VARCHAR, "testParamList");
- Property property3 = new Property("testParamList2", Direct.IN,
DataType.VARCHAR, "{testParamList1}");
- Property property4 = new Property("testParamList3", Direct.IN,
DataType.VARCHAR, "${b1zCurdate}");
-
- globalParamList.add(property2);
- globalParamList.add(property3);
- globalParamList.add(property4);
-
- String result5 =
dolphinSchedulerCuringGlobalParams.curingGlobalParams(1, globalParamMap,
globalParamList,
- CommandType.START_CURRENT_TASK_PROCESS, scheduleTime, null);
- Assertions.assertEquals(result5,
JSONUtils.toJsonString(globalParamList));
-
- Property testStartParamProperty = new Property("testStartParam",
Direct.IN, DataType.VARCHAR, "");
- globalParamList.add(testStartParamProperty);
- Property testStartParam2Property =
- new Property("testStartParam2", Direct.IN, DataType.VARCHAR,
"$[yyyy-MM-dd+1]");
- globalParamList.add(testStartParam2Property);
- globalParamMap.put("testStartParam", "");
- globalParamMap.put("testStartParam2", "$[yyyy-MM-dd+1]");
-
- Map<String, String> startParamMap = new HashMap<>(2);
- startParamMap.put("testStartParam", "$[yyyyMMdd]");
-
- for (Map.Entry<String, String> param : globalParamMap.entrySet()) {
- String val = startParamMap.get(param.getKey());
- if (val != null) {
- param.setValue(val);
- }
- }
-
- String result6 =
dolphinSchedulerCuringGlobalParams.curingGlobalParams(1, globalParamMap,
globalParamList,
- CommandType.START_CURRENT_TASK_PROCESS, scheduleTime, null);
- Assertions.assertEquals(result6,
JSONUtils.toJsonString(globalParamList));
- }
-
- @Test
- public void testParamParsingPreparation() {
- TaskInstance taskInstance = new TaskInstance();
- taskInstance.setId(1);
- taskInstance.setExecutePath("home/path/execute");
-
- TaskDefinition taskDefinition = new TaskDefinition();
- taskDefinition.setName("TaskName-1");
- taskDefinition.setCode(1000001l);
-
- WorkflowInstance workflowInstance = new WorkflowInstance();
- workflowInstance.setId(2);
- final BackfillWorkflowCommandParam backfillWorkflowCommandParam =
BackfillWorkflowCommandParam.builder()
- .timeZone("Asia/Shanghai")
- .build();
-
workflowInstance.setCommandParam(JSONUtils.toJsonString(backfillWorkflowCommandParam));
- workflowInstance.setHistoryCmd(CommandType.COMPLEMENT_DATA.toString());
- Property property = new Property();
- property.setDirect(Direct.IN);
- property.setProp("global_params");
- property.setValue("hello world");
- property.setType(DataType.VARCHAR);
- List<Property> properties = Lists.newArrayList(property);
- workflowInstance.setGlobalParams(JSONUtils.toJsonString(properties));
-
- WorkflowDefinition workflowDefinition = new WorkflowDefinition();
- workflowDefinition.setName("ProcessName-1");
- workflowDefinition.setProjectName("ProjectName-1");
- workflowDefinition.setProjectCode(3000001L);
- workflowDefinition.setCode(200001L);
-
- Project project = new Project();
- project.setName("ProjectName");
- project.setCode(3000001L);
-
-
workflowInstance.setWorkflowDefinitionCode(workflowDefinition.getCode());
- workflowInstance.setProjectCode(workflowDefinition.getProjectCode());
- taskInstance.setTaskCode(taskDefinition.getCode());
- taskInstance.setTaskDefinitionVersion(taskDefinition.getVersion());
- taskInstance.setProjectCode(workflowDefinition.getProjectCode());
- taskInstance.setWorkflowInstanceId(workflowInstance.getId());
-
- AbstractParameters parameters = new SubWorkflowParameters();
-
-
Mockito.when(projectParameterMapper.queryByProjectCode(Mockito.anyLong())).thenReturn(Collections.emptyList());
-
- Map<String, Property> propertyMap =
-
dolphinSchedulerCuringGlobalParams.paramParsingPreparation(taskInstance,
parameters, workflowInstance,
- project.getName(), workflowDefinition.getName());
- Assertions.assertNotNull(propertyMap);
-
Assertions.assertEquals(propertyMap.get(TaskConstants.PARAMETER_TASK_INSTANCE_ID).getValue(),
- String.valueOf(taskInstance.getId()));
-
Assertions.assertEquals(propertyMap.get(TaskConstants.PARAMETER_TASK_EXECUTE_PATH).getValue(),
- taskInstance.getExecutePath());
-
Assertions.assertEquals(propertyMap.get(TaskConstants.PARAMETER_WORKFLOW_INSTANCE_ID).getValue(),
- String.valueOf(workflowInstance.getId()));
- //
Assertions.assertEquals(propertyMap.get(TaskConstants.PARAMETER_WORKFLOW_DEFINITION_NAME).getValue(),
- // processDefinition.getName());
- //
Assertions.assertEquals(propertyMap.get(TaskConstants.PARAMETER_PROJECT_NAME).getValue(),
- // processDefinition.getProjectName());
-
Assertions.assertEquals(propertyMap.get(TaskConstants.PARAMETER_PROJECT_CODE).getValue(),
- String.valueOf(workflowDefinition.getProjectCode()));
-
Assertions.assertEquals(propertyMap.get(TaskConstants.PARAMETER_TASK_DEFINITION_CODE).getValue(),
- String.valueOf(taskDefinition.getCode()));
-
Assertions.assertEquals(propertyMap.get(TaskConstants.PARAMETER_WORKFLOW_DEFINITION_CODE).getValue(),
- String.valueOf(workflowDefinition.getCode()));
- }
-
- @Test
- public void testParseWorkflowStartParam() {
- Map<String, Property> result = new HashMap<>();
- // empty cmd param
- Map<String, String> startParamMap = new HashMap<>();
- result =
dolphinSchedulerCuringGlobalParams.parseWorkflowStartParam(startParamMap);
- Assertions.assertTrue(MapUtils.isEmpty(result));
-
- // without key
- startParamMap.put("testStartParam", "$[yyyyMMdd]");
- result =
dolphinSchedulerCuringGlobalParams.parseWorkflowStartParam(startParamMap);
- Assertions.assertTrue(MapUtils.isEmpty(result));
-
- startParamMap.put("StartParams", "{\"param1\":\"11111\",
\"param2\":\"22222\"}");
- result =
dolphinSchedulerCuringGlobalParams.parseWorkflowStartParam(startParamMap);
- Assertions.assertTrue(MapUtils.isNotEmpty(result));
- Assertions.assertEquals(2, result.keySet().size());
- Assertions.assertEquals("11111", result.get("param1").getValue());
- Assertions.assertEquals("22222", result.get("param2").getValue());
- }
-
- @Test
- public void testParseWorkflowFatherParam() {
- Map<String, Property> result = new HashMap<>();
- // empty cmd param
- Map<String, String> startParamMap = new HashMap<>();
- result =
dolphinSchedulerCuringGlobalParams.parseWorkflowFatherParam(startParamMap);
- Assertions.assertTrue(MapUtils.isEmpty(result));
-
- // without key
- startParamMap.put("testfatherParams", "$[yyyyMMdd]");
- result =
dolphinSchedulerCuringGlobalParams.parseWorkflowFatherParam(startParamMap);
- Assertions.assertTrue(MapUtils.isEmpty(result));
-
- startParamMap.put("fatherParams", "{\"param1\":\"11111\",
\"param2\":\"22222\"}");
- result =
dolphinSchedulerCuringGlobalParams.parseWorkflowFatherParam(startParamMap);
- Assertions.assertTrue(MapUtils.isNotEmpty(result));
- Assertions.assertEquals(2, result.keySet().size());
- Assertions.assertEquals("11111", result.get("param1").getValue());
- Assertions.assertEquals("22222", result.get("param2").getValue());
- }
-}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/parameters/AbstractParametersTest.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/parameters/AbstractParametersTest.java
index 435b5f60c2..7197dbe07e 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/parameters/AbstractParametersTest.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/parameters/AbstractParametersTest.java
@@ -30,6 +30,13 @@ import org.junit.jupiter.api.Test;
public class AbstractParametersTest {
+ /**
+ * Verifies that getInputLocalParametersMap() includes parameters with:
+ * - direct = null (treated as IN by default)
+ * - direct = Direct.IN
+ * and excludes parameters with:
+ * - direct = Direct.OUT
+ */
@Test
public void testGetInputLocalParametersMap() {
AbstractParameters parameters = new AbstractParameters() {
@@ -52,4 +59,81 @@ public class AbstractParametersTest {
Assertions.assertTrue(inputLocalParametersMap.containsKey("key1"));
Assertions.assertTrue(inputLocalParametersMap.containsKey("key2"));
}
+
+ /**
+ * Tests behavior when a Property has a null 'prop' field.
+ *
+ * ⚠️ WARNING: The current implementation will insert a (null, Property)
entry into the map,
+ * which causes JSON serialization to fail with:
+ * "Null key for a Map not allowed in JSON"
+ *
+ * This test exposes the risk. After fixing the method to skip null/empty
prop names,
+ * this test should assert size == 0.
+ */
+ @Test
+ public void
testGetInputLocalParametersMap_withNullProp_shouldNotPutNullKey() {
+ AbstractParameters parameters = new AbstractParameters() {
+
+ @Override
+ public boolean checkParameters() {
+ return false;
+ }
+ };
+
+ List<Property> localParams = new ArrayList<>();
+ // Dangerous: prop is null
+ localParams.add(new Property(null, Direct.IN, DataType.VARCHAR,
"dangerValue"));
+
+ parameters.setLocalParams(localParams);
+
+ Map<String, Property> inputLocalParametersMap =
parameters.getInputLocalParametersMap();
+
+ // Current behavior: null key is inserted
+ Assertions.assertEquals(1, inputLocalParametersMap.size());
+ Assertions.assertTrue(inputLocalParametersMap.containsKey(null)); // ❌
This breaks JSON serialization!
+
+ }
+
+ /**
+ * Tests behavior when a Property has an empty string as 'prop'.
+ * While Java allows empty string keys, they may cause issues downstream
(e.g., template parsing).
+ */
+ @Test
+ public void testGetInputLocalParametersMap_withEmptyProp() {
+ AbstractParameters parameters = new AbstractParameters() {
+
+ @Override
+ public boolean checkParameters() {
+ return false;
+ }
+ };
+
+ List<Property> localParams = new ArrayList<>();
+ localParams.add(new Property("", Direct.IN, DataType.VARCHAR,
"emptyKeyVal"));
+
+ parameters.setLocalParams(localParams);
+
+ Map<String, Property> inputLocalParametersMap =
parameters.getInputLocalParametersMap();
+
+ Assertions.assertEquals(1, inputLocalParametersMap.size());
+ Assertions.assertTrue(inputLocalParametersMap.containsKey(""));
+ }
+
+ /**
+ * Ensures the method handles null localParams gracefully (returns empty
map).
+ */
+ @Test
+ public void testGetInputLocalParametersMap_localParamsIsNull() {
+ AbstractParameters parameters = new AbstractParameters() {
+
+ @Override
+ public boolean checkParameters() {
+ return false;
+ }
+ };
+ parameters.setLocalParams(null);
+
+ Map<String, Property> result = parameters.getInputLocalParametersMap();
+ Assertions.assertEquals(0, result.size());
+ }
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/test/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTaskChannelTest.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/test/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTaskChannelTest.java
new file mode 100644
index 0000000000..0cafab1f2a
--- /dev/null
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/test/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTaskChannelTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.shell;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
+import
org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+
+public class ShellTaskChannelTest {
+
+ private final ShellTaskChannel shellTaskChannel = new ShellTaskChannel();
+
+ @Test
+ @DisplayName("parseParameters should return ShellParameters when given
valid JSON")
+ public void testParseParametersWithValidJson() {
+ String validTaskParams = "{\n" +
+ " \"rawScript\": \"echo 'hello world'\",\n" +
+ " \"localParams\": [\n" +
+ " {\n" +
+ " \"prop\": \"name\",\n" +
+ " \"direct\": \"IN\",\n" +
+ " \"type\": \"VARCHAR\",\n" +
+ " \"value\": \"test\"\n" +
+ " }\n" +
+ " ]\n" +
+ "}";
+
+ AbstractParameters params =
shellTaskChannel.parseParameters(validTaskParams);
+
+ assertNotNull(params, "Parsed parameters should not be null");
+ assertInstanceOf(ShellParameters.class, params, "Should be instance of
ShellParameters");
+
+ ShellParameters shellParams = (ShellParameters) params;
+ assertEquals("echo 'hello world'", shellParams.getRawScript());
+ assertNotNull(shellParams.getLocalParams());
+ assertEquals(1, shellParams.getLocalParams().size());
+ assertEquals("name", shellParams.getLocalParams().get(0).getProp());
+ }
+
+ @Test
+ @DisplayName("parseParameters should parse task_params with simple script
and one resource")
+ public void testParseShellTaskParamsWithSimpleScript() {
+ String taskParams = "{\n" +
+ " \"localParams\": [],\n" +
+ " \"rawScript\": \"#!/bin/bash\\nset -e\\n\\n\\n \\necho
\\\"====================================\\\"\",\n"
+ +
+ " \"resourceList\": [\n" +
+ " {\n" +
+ " \"resourceName\":
\"hdfs://abc/dolphinscheduler/default/123.sql\"\n" +
+ " }\n" +
+ " ]\n" +
+ "}";
+
+ // When
+ AbstractParameters parsed =
shellTaskChannel.parseParameters(taskParams);
+
+ // Then
+ assertNotNull(parsed, "Parsed parameters must not be null");
+ assertInstanceOf(ShellParameters.class, parsed, "Should be instance of
ShellParameters");
+
+ ShellParameters params = (ShellParameters) parsed;
+
+ // Verify rawScript
+ String expectedRawScript = "#!/bin/bash\nset -e\n\n\n \necho
\"====================================\"";
+ assertEquals(expectedRawScript, params.getRawScript(), "rawScript
content mismatch");
+
+ // Verify localParams is empty
+ assertNotNull(params.getLocalParams(), "localParams should not be
null");
+ assertEquals(0, params.getLocalParams().size(), "localParams should be
empty list");
+
+ // Verify resourceList
+ assertNotNull(params.getResourceList(), "resourceList should not be
null");
+ assertEquals(1, params.getResourceList().size(), "resourceList should
contain exactly one item");
+
+ ResourceInfo resource = params.getResourceList().get(0);
+ assertEquals(
+ "hdfs://abc/dolphinscheduler/default/123.sql",
+ resource.getResourceName(),
+ "Resource name does not match");
+ }
+
+ @Test
+ @DisplayName("parseParameters should return empty ShellParameters when
given empty JSON object")
+ public void testParseParametersWithEmptyJson() {
+ String emptyJson = "{}";
+ AbstractParameters params =
shellTaskChannel.parseParameters(emptyJson);
+ assertNotNull(params);
+ assertInstanceOf(ShellParameters.class, params);
+ assertNull(((ShellParameters) params).getRawScript());
+ }
+
+ @Test
+ @DisplayName("parseParameters should handle null input gracefully")
+ public void testParseParametersWithNullInput() {
+ assertNull(shellTaskChannel.parseParameters(null));
+ }
+
+ @Test
+ @DisplayName("parseParameters should handle empty string input")
+ public void testParseParametersWithEmptyString() {
+ assertNull(shellTaskChannel.parseParameters(""));
+ }
+
+ @Test
+ @DisplayName("parseParameters should throw exception on malformed JSON")
+ public void testParseParametersWithInvalidJson() {
+ String invalidJson = "{ rawScript: 'missing quotes' }";
+
+ assertThrows(RuntimeException.class, () -> {
+ shellTaskChannel.parseParameters(invalidJson);
+ });
+ }
+}