This is an automated email from the ASF dual-hosted git repository.

zihaoxiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new f4d540a1f9 [Fix-17534][Service/Master] Add global parameters and 
varpool from current workflow instance and add them to start params list of the 
trigger request of a sub workflow (#17578)
f4d540a1f9 is described below

commit f4d540a1f960a6a37ec403b433db47a5a8b78788
Author: Kris Vermeulen <[email protected]>
AuthorDate: Mon Nov 10 06:42:42 2025 +0100

    [Fix-17534][Service/Master] Add global parameters and varpool from current 
workflow instance and add them to start params list of the trigger request of a 
sub workflow (#17578)
---
 .../plugin/subworkflow/SubWorkflowLogicTask.java   |  39 +++++-
 ...lowInstanceGlobalParamsInheritanceTestCase.java | 122 +++++++++++++++++++
 ...flow_with_sub_workflows_with_global_params.yaml | 135 +++++++++++++++++++++
 3 files changed, 294 insertions(+), 2 deletions(-)

diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/plugin/subworkflow/SubWorkflowLogicTask.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/plugin/subworkflow/SubWorkflowLogicTask.java
index 89222ebc24..7e6ef28237 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/plugin/subworkflow/SubWorkflowLogicTask.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/plugin/subworkflow/SubWorkflowLogicTask.java
@@ -17,6 +17,9 @@
 
 package 
org.apache.dolphinscheduler.server.master.engine.executor.plugin.subworkflow;
 
+import static java.util.Arrays.asList;
+import static 
org.apache.dolphinscheduler.plugin.task.api.utils.VarPoolUtils.deserializeVarPool;
+
 import org.apache.dolphinscheduler.common.enums.Flag;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
@@ -35,6 +38,7 @@ import 
org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowI
 import 
org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowManualTriggerRequest;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
 import 
org.apache.dolphinscheduler.plugin.task.api.parameters.SubWorkflowParameters;
 import 
org.apache.dolphinscheduler.server.master.engine.executor.plugin.AbstractLogicTask;
 import 
org.apache.dolphinscheduler.server.master.engine.executor.plugin.ITaskParameterDeserializer;
@@ -43,6 +47,14 @@ import 
org.apache.dolphinscheduler.server.master.exception.MasterTaskExecuteExce
 import org.apache.dolphinscheduler.task.executor.ITaskExecutor;
 import 
org.apache.dolphinscheduler.task.executor.events.TaskExecutorRuntimeContextChangedLifecycleEvent;
 
+import org.apache.commons.collections4.CollectionUtils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 import lombok.extern.slf4j.Slf4j;
 
 import org.springframework.context.ApplicationContext;
@@ -222,6 +234,11 @@ public class SubWorkflowLogicTask extends 
AbstractLogicTask<SubWorkflowParameter
         final ICommandParam commandParam =
                 JSONUtils.parseObject(workflowInstance.getCommandParam(), 
ICommandParam.class);
 
+        final List<Property> paramList = mergeParams(asList(
+                new 
ArrayList<>(deserializeVarPool(workflowInstance.getGlobalParams())),
+                commandParam.getCommandParams(),
+                new 
ArrayList<>(deserializeVarPool(workflowInstance.getVarPool()))));
+
         final WorkflowManualTriggerRequest workflowManualTriggerRequest = 
WorkflowManualTriggerRequest.builder()
                 .userId(taskExecutionContext.getExecutorId())
                 .workflowDefinitionCode(subWorkflowDefinition.getCode())
@@ -233,8 +250,7 @@ public class SubWorkflowLogicTask extends 
AbstractLogicTask<SubWorkflowParameter
                 .workerGroup(workflowInstance.getWorkerGroup())
                 .tenantCode(workflowInstance.getTenantCode())
                 .environmentCode(workflowInstance.getEnvironmentCode())
-                // todo: transport varpool and local params
-                .startParamList(commandParam.getCommandParams())
+                .startParamList(paramList)
                 .dryRun(Flag.of(workflowInstance.getDryRun()))
                 .build();
         final Integer subWorkflowInstanceId = applicationContext
@@ -243,6 +259,25 @@ public class SubWorkflowLogicTask extends 
AbstractLogicTask<SubWorkflowParameter
         return SubWorkflowLogicTaskRuntimeContext.of(subWorkflowInstanceId);
     }
 
+    private List<Property> mergeParams(List<List<Property>> params) {
+        if (CollectionUtils.isEmpty(params)) {
+            return Collections.emptyList();
+        }
+        if (params.size() == 1) {
+            return params.get(0);
+        }
+        Map<String, Property> result = new HashMap<>();
+        for (List<Property> param : params) {
+            if (CollectionUtils.isEmpty(param)) {
+                continue;
+            }
+            for (Property property : param) {
+                result.put(property.getProp(), property);
+            }
+        }
+        return new ArrayList<>(result.values());
+    }
+
     private void upsertSubWorkflowRelation() {
         final WorkflowInstanceMapDao workflowInstanceMapDao = 
applicationContext.getBean(WorkflowInstanceMapDao.class);
         WorkflowInstanceRelation workflowInstanceRelation = 
workflowInstanceMapDao.queryWorkflowMapByParent(
diff --git 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/SubWorkflowInstanceGlobalParamsInheritanceTestCase.java
 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/SubWorkflowInstanceGlobalParamsInheritanceTestCase.java
new file mode 100644
index 0000000000..b7f2cb2fe9
--- /dev/null
+++ 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/SubWorkflowInstanceGlobalParamsInheritanceTestCase.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.integration.cases;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.awaitility.Awaitility.await;
+
+import org.apache.dolphinscheduler.common.enums.Flag;
+import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
+import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
+import 
org.apache.dolphinscheduler.extract.master.command.RunWorkflowCommandParam;
+import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
+import 
org.apache.dolphinscheduler.server.master.AbstractMasterIntegrationTestCase;
+import org.apache.dolphinscheduler.server.master.integration.WorkflowOperator;
+import 
org.apache.dolphinscheduler.server.master.integration.WorkflowTestCaseContext;
+
+import java.time.Duration;
+import java.util.List;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+
+/**
+ * The integration test for validating sub workflow instances inherit global 
params from all parent flows.
+ * Global params are asserted in each sub workflow instance and the fake_task 
in the sub-sub-workflow is used to verify the global params.
+ */
+class SubWorkflowInstanceGlobalParamsInheritanceTestCase extends 
AbstractMasterIntegrationTestCase {
+
+    @Test
+    @DisplayName("Test subflows inherit global params from all parent flows")
+    void testSubflowInheritsGlobalParamsFromParentFlows_with_oneSuccessTask() {
+        final String yaml = 
"/it/start/workflow_with_sub_workflows_with_global_params.yaml";
+        final WorkflowTestCaseContext context = 
workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
+        final WorkflowDefinition parentWorkflow = context.getOneWorkflow();
+
+        final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = 
WorkflowOperator.WorkflowTriggerDTO.builder()
+                .workflowDefinition(parentWorkflow)
+                .runWorkflowCommandParam(new RunWorkflowCommandParam())
+                .build();
+        final Integer workflowInstanceId = 
workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
+
+        await()
+                .atMost(Duration.ofMinutes(1))
+                .untilAsserted(() -> {
+                    Assertions
+                            
.assertThat(repository.queryWorkflowInstance(workflowInstanceId))
+                            .matches(workflowInstance -> 
workflowInstance.getState() == WorkflowExecutionStatus.SUCCESS)
+                            .matches(workflowInstance -> 
workflowInstance.getIsSubWorkflow() == Flag.NO);
+
+                    final List<WorkflowInstance> subWorkflowInstance =
+                            
repository.queryWorkflowInstance(context.getWorkflows().get(1));
+                    Assertions
+                            .assertThat(subWorkflowInstance)
+                            .hasSize(1)
+                            .satisfiesExactly(workflowInstance -> {
+                                
assertThat(workflowInstance.getState()).isEqualTo(WorkflowExecutionStatus.SUCCESS);
+                                
assertThat(workflowInstance.getIsSubWorkflow()).isEqualTo(Flag.YES);
+
+                                Assertions
+                                        .assertThat(
+                                                
JSONUtils.toList(workflowInstance.getGlobalParams(), Property.class))
+                                        .hasSize(1)
+                                        .anySatisfy(property -> {
+                                            
assertThat(property.getProp()).isEqualTo("parentWorkflowParam");
+                                            
assertThat(property.getValue()).isEqualTo("parentWorkflowParamValue");
+                                        });
+                            });
+
+                    final List<WorkflowInstance> subSubWorkflowInstance =
+                            
repository.queryWorkflowInstance(context.getWorkflows().get(2));
+                    Assertions
+                            .assertThat(subSubWorkflowInstance)
+                            .hasSize(1)
+                            .satisfiesExactly(workflowInstance -> {
+                                
assertThat(workflowInstance.getState()).isEqualTo(WorkflowExecutionStatus.SUCCESS);
+                                
assertThat(workflowInstance.getIsSubWorkflow()).isEqualTo(Flag.YES);
+
+                                Assertions
+                                        .assertThat(
+                                                
JSONUtils.toList(workflowInstance.getGlobalParams(), Property.class))
+                                        .hasSize(2)
+                                        .anySatisfy(property -> {
+                                            
assertThat(property.getProp()).isEqualTo("parentWorkflowParam");
+                                            
assertThat(property.getValue()).isEqualTo("parentWorkflowParamValue");
+                                        })
+                                        .anySatisfy(property -> {
+                                            
assertThat(property.getProp()).isEqualTo("subWorkflowParam");
+                                            
assertThat(property.getValue()).isEqualTo("subWorkflowParamValue");
+                                        });
+                            });
+
+                    Assertions
+                            
.assertThat(repository.queryTaskInstance(subSubWorkflowInstance.get(0).getId()))
+                            .satisfiesExactly(taskInstance -> {
+                                
assertThat(taskInstance.getName()).isEqualTo("fake_task");
+                                
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
+                            });
+                });
+
+        masterContainer.assertAllResourceReleased();
+    }
+
+}
diff --git 
a/dolphinscheduler-master/src/test/resources/it/start/workflow_with_sub_workflows_with_global_params.yaml
 
b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_sub_workflows_with_global_params.yaml
new file mode 100644
index 0000000000..b3063b2023
--- /dev/null
+++ 
b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_sub_workflows_with_global_params.yaml
@@ -0,0 +1,135 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+project:
+  name: MasterIntegrationTest
+  code: 1
+  description: This is a fake project
+  userId: 1
+  userName: admin
+  createTime: 2024-08-12 00:00:00
+  updateTime: 2021-08-12 00:00:00
+
+workflows:
+  - name: workflow_with_sub_workflow
+    code: 1
+    version: 1
+    projectCode: 1
+    description: This is a fake workflow with single subflow task
+    releaseState: ONLINE
+    createTime: 2024-08-12 00:00:00
+    updateTime: 2021-08-12 00:00:00
+    userId: 1
+    globalParams: >
+      [{
+        "prop": "parentWorkflowParam",
+        "value": "parentWorkflowParamValue",
+        "direct": "IN",
+        "type":"VARCHAR"
+      }]
+    executionType: PARALLEL
+  - name: subworkflow_with_another_subworkflow
+    code: 2
+    version: 1
+    projectCode: 1
+    description: This is a fake sub workflow with another single subflow task
+    releaseState: ONLINE
+    createTime: 2024-08-12 00:00:00
+    updateTime: 2021-08-12 00:00:00
+    userId: 1
+    executionType: PARALLEL
+  - name: subworkflow_with_single_task
+    code: 3
+    version: 1
+    projectCode: 1
+    description: This is a fake sub sub workflow with single task
+    releaseState: ONLINE
+    createTime: 2024-08-12 00:00:00
+    updateTime: 2021-08-12 00:00:00
+    userId: 1
+    globalParams: >
+      [{
+        "prop": "subWorkflowParam",
+        "value": "subWorkflowParamValue",
+        "direct": "IN",
+        "type":"VARCHAR"
+      }]
+    executionType: PARALLEL
+
+tasks:
+  - name: sub_workflow_task
+    code: 1
+    version: 1
+    projectCode: 1
+    userId: 1
+    taskType: SUB_WORKFLOW
+    taskParams: 
'{"localParams":[],"resourceList":[],"workflowDefinitionCode":2}'
+    workerGroup: default
+    createTime: 2024-08-12 00:00:00
+    updateTime: 2021-08-12 00:00:00
+    taskExecuteType: BATCH
+  - name: sub_sub_workflow_task
+    code: 2
+    version: 1
+    projectCode: 1
+    userId: 1
+    taskType: SUB_WORKFLOW
+    taskParams: 
'{"localParams":[],"resourceList":[],"workflowDefinitionCode":3}'
+    workerGroup: default
+    createTime: 2024-08-12 00:00:00
+    updateTime: 2021-08-12 00:00:00
+    taskExecuteType: BATCH
+  - name: fake_task
+    code: 3
+    version: 1
+    projectCode: 1
+    userId: 1
+    taskType: LogicFakeTask
+    taskParams: '{"localParams":null,"varPool":[],"shellScript":  "if [ 
\"${parentWorkflowParam}\" != \"parentWorkflowParamValue\" ]; then\n  exit 
1\nelif [ \"${subWorkflowParam}\" != \"subWorkflowParamValue\" ]; then\n  exit 
1\nelse\n  exit 0\nfi"}'
+    workerGroup: default
+    createTime: 2024-08-12 00:00:00
+    updateTime: 2021-08-12 00:00:00
+    taskExecuteType: BATCH
+
+taskRelations:
+  - projectCode: 1
+    workflowDefinitionCode: 1
+    workflowDefinitionVersion: 1
+    preTaskCode: 0
+    preTaskVersion: 0
+    postTaskCode: 1
+    postTaskVersion: 1
+    createTime: 2024-08-12 00:00:00
+    updateTime: 2024-08-12 00:00:00
+  - projectCode: 1
+    workflowDefinitionCode: 2
+    workflowDefinitionVersion: 1
+    preTaskCode: 0
+    preTaskVersion: 0
+    postTaskCode: 2
+    postTaskVersion: 1
+    createTime: 2024-08-12 00:00:00
+    updateTime: 2024-08-12 00:00:00
+  - projectCode: 1
+    workflowDefinitionCode: 3
+    workflowDefinitionVersion: 1
+    preTaskCode: 0
+    preTaskVersion: 0
+    postTaskCode: 3
+    postTaskVersion: 1
+    createTime: 2024-08-12 00:00:00
+    updateTime: 2024-08-12 00:00:00

Reply via email to