This is an automated email from the ASF dual-hosted git repository.
fjtiradosarti pushed a commit to branch main
in repository
https://gitbox.apache.org/repos/asf/incubator-kie-kogito-runtimes.git
The following commit(s) were added to refs/heads/main by this push:
new cc5ee492ad [Fix_#4051] Parallel branch isolation (#4068)
cc5ee492ad is described below
commit cc5ee492adb6476cdbf805d3171c29ca0c3d3b36
Author: Francisco Javier Tirado Sarti
<[email protected]>
AuthorDate: Mon Sep 15 22:53:51 2025 +0200
[Fix_#4051] Parallel branch isolation (#4068)
---
.../handlers/CompositeContextNodeHandler.java | 36 +++++++++-------
.../workflow/parser/handlers/EventHandler.java | 1 -
.../workflow/parser/handlers/ParallelHandler.java | 48 +++++++++++++++++++---
.../workflow/parser/handlers/StateHandler.java | 29 +++++--------
...plier.java => CloneVariableActionSupplier.java} | 13 +++---
.../workflow/suppliers/SetValueActionSupplier.java | 3 +-
.../StaticFluentWorkflowApplicationTest.java | 19 +++++++++
.../workflow/actions/CloneVariableAction.java | 39 ++++++++++++++++++
.../src/main/resources/parallel.sw.json | 14 +++----
.../kogito/quarkus/workflows/ParallelStateIT.java | 18 ++++++--
10 files changed, 161 insertions(+), 59 deletions(-)
diff --git
a/kogito-serverless-workflow/kogito-serverless-workflow-builder/src/main/java/org/kie/kogito/serverless/workflow/parser/handlers/CompositeContextNodeHandler.java
b/kogito-serverless-workflow/kogito-serverless-workflow-builder/src/main/java/org/kie/kogito/serverless/workflow/parser/handlers/CompositeContextNodeHandler.java
index 68791096b2..3285760238 100644
---
a/kogito-serverless-workflow/kogito-serverless-workflow-builder/src/main/java/org/kie/kogito/serverless/workflow/parser/handlers/CompositeContextNodeHandler.java
+++
b/kogito-serverless-workflow/kogito-serverless-workflow-builder/src/main/java/org/kie/kogito/serverless/workflow/parser/handlers/CompositeContextNodeHandler.java
@@ -51,6 +51,7 @@ import io.serverlessworkflow.api.sleep.Sleep;
import io.serverlessworkflow.api.workflow.Functions;
import static org.kie.kogito.internal.utils.ConversionUtils.isEmpty;
+import static
org.kie.kogito.serverless.workflow.parser.ServerlessWorkflowParser.DEFAULT_WORKFLOW_VAR;
import static
org.kie.kogito.serverless.workflow.parser.handlers.NodeFactoryUtils.exclusiveSplitNode;
import static
org.kie.kogito.serverless.workflow.parser.handlers.NodeFactoryUtils.subprocessNode;
import static
org.kie.kogito.serverless.workflow.parser.handlers.NodeFactoryUtils.timerNode;
@@ -74,27 +75,34 @@ public abstract class CompositeContextNodeHandler<S extends
State> extends State
}
protected final <T extends AbstractCompositeNodeFactory<?, ?>> T
handleActions(T embeddedSubProcess, List<Action> actions, String outputVar,
boolean shouldMerge) {
+ return handleActions(embeddedSubProcess, actions, outputVar,
shouldMerge, DEFAULT_WORKFLOW_VAR);
+ }
+
+ protected final <T extends AbstractCompositeNodeFactory<?, ?>> T
handleActions(T embeddedSubProcess, List<Action> actions, String outputVar,
boolean shouldMerge, String modelVar) {
+ NodeFactory<?, ?> startNode =
embeddedSubProcess.startNode(parserContext.newId()).name("EmbeddedStart");
+ NodeFactory<?, ?> currentNode = handleActions(embeddedSubProcess,
startNode, actions, outputVar, shouldMerge, modelVar);
+ connect(currentNode,
embeddedSubProcess.endNode(parserContext.newId()).name("EmbeddedEnd").terminate(true)).done();
+ return embeddedSubProcess;
+ }
+
+ protected <T extends AbstractCompositeNodeFactory<?, ?>> NodeFactory<?, ?>
handleActions(T embeddedSubProcess, NodeFactory<?, ?> currentNode, List<Action>
actions, String outputVar,
+ boolean shouldMerge, String modelVar) {
if (actions != null && !actions.isEmpty()) {
- NodeFactory<?, ?> startNode =
embeddedSubProcess.startNode(parserContext.newId()).name("EmbeddedStart");
- NodeFactory<?, ?> currentNode = startNode;
for (Action action : actions) {
- currentNode = connect(currentNode,
getActionNode(embeddedSubProcess, action, outputVar != null ? outputVar :
getVarName(), shouldMerge));
+ currentNode = connect(currentNode,
getActionNode(embeddedSubProcess, action, outputVar != null ? outputVar :
getVarName(), shouldMerge, modelVar));
}
- connect(currentNode,
embeddedSubProcess.endNode(parserContext.newId()).name("EmbeddedEnd").terminate(true)).done();
- } else {
-
connect(embeddedSubProcess.startNode(parserContext.newId()).name("EmbeddedStart"),
embeddedSubProcess.endNode(parserContext.newId()).name("EmbeddedEnd").terminate(true)).done();
}
- return embeddedSubProcess;
+ return currentNode;
}
protected final MakeNodeResult
getActionNode(RuleFlowNodeContainerFactory<?, ?> embeddedSubProcess,
Action action) {
- return getActionNode(embeddedSubProcess, action, getVarName(), true);
+ return getActionNode(embeddedSubProcess, action, getVarName(), true,
DEFAULT_WORKFLOW_VAR);
}
protected final MakeNodeResult
getActionNode(RuleFlowNodeContainerFactory<?, ?> embeddedSubProcess,
- Action action, String collectVar, boolean shouldMerge) {
- return addActionCondition(embeddedSubProcess, action,
addActionSleep(embeddedSubProcess, action,
processActionFilter(embeddedSubProcess, action, collectVar, shouldMerge)));
+ Action action, String collectVar, boolean shouldMerge, String
modelVar) {
+ return addActionCondition(embeddedSubProcess, action,
addActionSleep(embeddedSubProcess, action,
processActionFilter(embeddedSubProcess, action, collectVar, shouldMerge,
modelVar)));
}
private MakeNodeResult addActionCondition(RuleFlowNodeContainerFactory<?,
?> embeddedSubProcess, Action action, MakeNodeResult actionNode) {
@@ -143,7 +151,7 @@ public abstract class CompositeContextNodeHandler<S extends
State> extends State
}
private MakeNodeResult processActionFilter(RuleFlowNodeContainerFactory<?,
?> embeddedSubProcess,
- Action action, String collectVar, boolean shouldMerge) {
+ Action action, String collectVar, boolean shouldMerge, String
modelVar) {
ActionDataFilter actionFilter = action.getActionDataFilter();
String fromExpr = null;
String resultExpr = null;
@@ -157,13 +165,13 @@ public abstract class CompositeContextNodeHandler<S
extends State> extends State
}
if (action.getFunctionRef() != null) {
return filterAndMergeNode(embeddedSubProcess, collectVar,
fromExpr, resultExpr, toExpr, useData, shouldMerge,
- (factory, inputVar, outputVar) ->
addActionMetadata(getActionNode(factory, action.getFunctionRef(), inputVar,
outputVar), action));
+ (factory, inputVar, outputVar) ->
addActionMetadata(getActionNode(factory, action.getFunctionRef(), inputVar,
outputVar), action), modelVar);
} else if (action.getEventRef() != null) {
return filterAndMergeNode(embeddedSubProcess, collectVar,
fromExpr, resultExpr, toExpr, useData, shouldMerge,
- (factory, inputVar, outputVar) ->
addActionMetadata(getActionNode(factory, action.getEventRef(), inputVar),
action));
+ (factory, inputVar, outputVar) ->
addActionMetadata(getActionNode(factory, action.getEventRef(), inputVar),
action), modelVar);
} else if (action.getSubFlowRef() != null) {
return filterAndMergeNode(embeddedSubProcess, collectVar,
fromExpr, resultExpr, toExpr, useData, shouldMerge,
- (factory, inputVar, outputVar) ->
addActionMetadata(getActionNode(factory, action.getSubFlowRef(), inputVar,
outputVar), action));
+ (factory, inputVar, outputVar) ->
addActionMetadata(getActionNode(factory, action.getSubFlowRef(), inputVar,
outputVar), action), modelVar);
} else {
return faultyNodeResult(embeddedSubProcess, "Action node " +
action.getName() + " of state " + state.getName() + " does not have function or
event defined");
}
diff --git
a/kogito-serverless-workflow/kogito-serverless-workflow-builder/src/main/java/org/kie/kogito/serverless/workflow/parser/handlers/EventHandler.java
b/kogito-serverless-workflow/kogito-serverless-workflow-builder/src/main/java/org/kie/kogito/serverless/workflow/parser/handlers/EventHandler.java
index 005f754a86..bf2548126e 100644
---
a/kogito-serverless-workflow/kogito-serverless-workflow-builder/src/main/java/org/kie/kogito/serverless/workflow/parser/handlers/EventHandler.java
+++
b/kogito-serverless-workflow/kogito-serverless-workflow-builder/src/main/java/org/kie/kogito/serverless/workflow/parser/handlers/EventHandler.java
@@ -31,7 +31,6 @@ import org.jbpm.workflow.core.node.Join;
import org.jbpm.workflow.core.node.Split;
import org.kie.kogito.jackson.utils.ObjectMapperFactory;
import org.kie.kogito.serverless.workflow.parser.ParserContext;
-import
org.kie.kogito.serverless.workflow.parser.handlers.StateHandler.FilterableNodeSupplier;
import org.kie.kogito.serverless.workflow.suppliers.SetCollectorActionSupplier;
import
org.kie.kogito.serverless.workflow.suppliers.SetExpressionActionSupplier;
import org.kie.kogito.serverless.workflow.suppliers.SetValueActionSupplier;
diff --git
a/kogito-serverless-workflow/kogito-serverless-workflow-builder/src/main/java/org/kie/kogito/serverless/workflow/parser/handlers/ParallelHandler.java
b/kogito-serverless-workflow/kogito-serverless-workflow-builder/src/main/java/org/kie/kogito/serverless/workflow/parser/handlers/ParallelHandler.java
index db07cbdf22..125245a715 100644
---
a/kogito-serverless-workflow/kogito-serverless-workflow-builder/src/main/java/org/kie/kogito/serverless/workflow/parser/handlers/ParallelHandler.java
+++
b/kogito-serverless-workflow/kogito-serverless-workflow-builder/src/main/java/org/kie/kogito/serverless/workflow/parser/handlers/ParallelHandler.java
@@ -18,8 +18,14 @@
*/
package org.kie.kogito.serverless.workflow.parser.handlers;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
import org.jbpm.ruleflow.core.Metadata;
import org.jbpm.ruleflow.core.RuleFlowNodeContainerFactory;
+import org.jbpm.ruleflow.core.factory.AbstractCompositeNodeFactory;
import org.jbpm.ruleflow.core.factory.CompositeContextNodeFactory;
import org.jbpm.ruleflow.core.factory.JoinFactory;
import org.jbpm.ruleflow.core.factory.NodeFactory;
@@ -31,7 +37,9 @@ import org.kie.kogito.process.expr.ExpressionHandlerFactory;
import org.kie.kogito.serverless.workflow.SWFConstants;
import org.kie.kogito.serverless.workflow.parser.ParserContext;
import org.kie.kogito.serverless.workflow.parser.ServerlessWorkflowParser;
+import
org.kie.kogito.serverless.workflow.suppliers.CloneVariableActionSupplier;
import
org.kie.kogito.serverless.workflow.suppliers.ExpressionReturnValueEvaluatorSupplier;
+import org.kie.kogito.serverless.workflow.suppliers.MergeActionSupplier;
import io.serverlessworkflow.api.Workflow;
import io.serverlessworkflow.api.actions.Action;
@@ -39,6 +47,8 @@ import io.serverlessworkflow.api.branches.Branch;
import io.serverlessworkflow.api.states.ParallelState;
import io.serverlessworkflow.api.states.ParallelState.CompletionType;
+import static
org.kie.kogito.serverless.workflow.parser.ServerlessWorkflowParser.DEFAULT_WORKFLOW_VAR;
+
public class ParallelHandler extends
CompositeContextNodeHandler<ParallelState> {
protected ParallelHandler(ParallelState state, Workflow workflow,
ParserContext parserContext) {
@@ -49,7 +59,7 @@ public class ParallelHandler extends
CompositeContextNodeHandler<ParallelState>
@Override
public MakeNodeResult makeNode(RuleFlowNodeContainerFactory<?, ?> factory)
{
- SplitFactory<?> nodeFactory =
factory.splitNode(parserContext.newId()).name(state.getName() +
ServerlessWorkflowParser.NODE_START_NAME).type(Split.TYPE_AND);
+ SplitFactory<?> splitFactory =
factory.splitNode(parserContext.newId()).name(state.getName() +
ServerlessWorkflowParser.NODE_START_NAME).type(Split.TYPE_AND);
JoinFactory<?> connectionNode =
factory.joinNode(parserContext.newId()).name(state.getName() +
ServerlessWorkflowParser.NODE_END_NAME);
CompletionType completionType = state.getCompletionType();
if (completionType == CompletionType.ALL_OF) {
@@ -60,17 +70,45 @@ public class ParallelHandler extends
CompositeContextNodeHandler<ParallelState>
connectionNode.type(numCompleted);
if (ExpressionHandlerFactory.get(workflow.getExpressionLang(),
numCompleted).isValid()) {
connectionNode.metaData(Metadata.ACTION,
- new
ExpressionReturnValueEvaluatorSupplier(workflow.getExpressionLang(),
state.getNumCompleted(), SWFConstants.DEFAULT_WORKFLOW_VAR, Integer.class));
+ new
ExpressionReturnValueEvaluatorSupplier(workflow.getExpressionLang(),
state.getNumCompleted(), DEFAULT_WORKFLOW_VAR, Integer.class));
}
}
+
+ Set<String> branchVariables = new HashSet<>();
for (Branch branch : state.getBranches()) {
currentBranch = branch;
- CompositeContextNodeFactory<?> embeddedSubProcess =
handleActions(makeCompositeNode(factory, getName(branch)), branch.getActions());
+ String branchVarName = getVarName();
+ branchVariables.add(branchVarName);
+ CompositeContextNodeFactory<?> embeddedSubProcess =
+ handleActions(makeCompositeNode(factory, getName(branch)),
branch.getActions(), null, true, branchVarName);
handleErrors(factory, embeddedSubProcess);
WorkflowElementIdentifier branchId =
embeddedSubProcess.getNode().getId();
-
embeddedSubProcess.done().connection(nodeFactory.getNode().getId(),
branchId).connection(branchId, connectionNode.getNode().getId());
+
embeddedSubProcess.done().connection(splitFactory.getNode().getId(),
branchId).connection(branchId, connectionNode.getNode().getId());
}
- return new MakeNodeResult(nodeFactory, connectionNode);
+
+ Iterator<String> iter = branchVariables.iterator();
+ NodeFactory<?, ?> startNode;
+ if (iter.hasNext()) {
+ startNode = factory.actionNode(parserContext.newId())
+ .action(new
CloneVariableActionSupplier(DEFAULT_WORKFLOW_VAR, iter.next()));
+ NodeFactory<?, ?> currentNode = startNode;
+ while (iter.hasNext()) {
+ currentNode = connect(currentNode,
factory.actionNode(parserContext.newId())
+ .action(new
CloneVariableActionSupplier(DEFAULT_WORKFLOW_VAR, iter.next())));
+ }
+ connect(currentNode, splitFactory);
+ } else {
+ startNode = splitFactory;
+ }
+ return new MakeNodeResult(startNode, connectionNode);
+ }
+
+ @Override
+ protected <T extends AbstractCompositeNodeFactory<?, ?>> NodeFactory<?, ?>
handleActions(T embeddedSubProcess, NodeFactory<?, ?> currentNode, List<Action>
actions, String outputVar,
+ boolean shouldMerge, String modelVar) {
+ currentNode = super.handleActions(embeddedSubProcess, currentNode,
actions, outputVar, shouldMerge, modelVar);
+ return connect(currentNode,
embeddedSubProcess.actionNode(parserContext.newId())
+ .action(new MergeActionSupplier(modelVar,
DEFAULT_WORKFLOW_VAR)));
}
private String getName(Branch branch) {
diff --git
a/kogito-serverless-workflow/kogito-serverless-workflow-builder/src/main/java/org/kie/kogito/serverless/workflow/parser/handlers/StateHandler.java
b/kogito-serverless-workflow/kogito-serverless-workflow-builder/src/main/java/org/kie/kogito/serverless/workflow/parser/handlers/StateHandler.java
index ca331c15df..ab530955fb 100644
---
a/kogito-serverless-workflow/kogito-serverless-workflow-builder/src/main/java/org/kie/kogito/serverless/workflow/parser/handlers/StateHandler.java
+++
b/kogito-serverless-workflow/kogito-serverless-workflow-builder/src/main/java/org/kie/kogito/serverless/workflow/parser/handlers/StateHandler.java
@@ -51,7 +51,6 @@ import org.kie.kogito.internal.utils.KogitoTags;
import org.kie.kogito.serverless.workflow.SWFConstants;
import org.kie.kogito.serverless.workflow.parser.ParserContext;
import org.kie.kogito.serverless.workflow.parser.ServerlessWorkflowParser;
-import org.kie.kogito.serverless.workflow.parser.VariableInfo;
import org.kie.kogito.serverless.workflow.suppliers.CollectorActionSupplier;
import org.kie.kogito.serverless.workflow.suppliers.CompensationActionSupplier;
import
org.kie.kogito.serverless.workflow.suppliers.ErrorExpressionActionSupplier;
@@ -439,50 +438,44 @@ public abstract class StateHandler<S extends State> {
toExpr = eventFilter.getToStateData();
useData = eventFilter.isUseData();
}
- return filterAndMergeNode(embeddedSubProcess, isStartState ? new
VariableInfo(DEFAULT_WORKFLOW_VAR, varName) : new VariableInfo(varName,
varName), null, dataExpr, toExpr, useData, true,
- nodeSupplier);
+ return filterAndMergeNode(embeddedSubProcess, varName, null, dataExpr,
toExpr, useData, true,
+ nodeSupplier, DEFAULT_WORKFLOW_VAR);
}
protected boolean isTempVariable(String varName) {
- return !varName.equals(ServerlessWorkflowParser.DEFAULT_WORKFLOW_VAR);
+ return !varName.equals(DEFAULT_WORKFLOW_VAR);
}
protected final MakeNodeResult
filterAndMergeNode(RuleFlowNodeContainerFactory<?, ?> embeddedSubProcess,
String actionVarName, String fromStateExpr, String resultExpr, String
toStateExpr,
boolean useData,
- boolean shouldMerge, FilterableNodeSupplier nodeSupplier) {
- return filterAndMergeNode(embeddedSubProcess, new
VariableInfo(actionVarName, actionVarName), fromStateExpr, resultExpr,
toStateExpr, useData, shouldMerge, nodeSupplier);
- }
-
- protected final MakeNodeResult
filterAndMergeNode(RuleFlowNodeContainerFactory<?, ?> embeddedSubProcess,
VariableInfo variableInfo, String fromStateExpr, String resultExpr, String
toStateExpr,
- boolean useData,
- boolean shouldMerge, FilterableNodeSupplier nodeSupplier) {
- String actionVarName = variableInfo.getOutputVar();
+ boolean shouldMerge, FilterableNodeSupplier nodeSupplier, String
modelVar) {
if (isTempVariable(actionVarName)) {
embeddedSubProcess.variable(actionVarName, new
ObjectDataType(JsonNode.class.getCanonicalName()),
Map.of(KogitoTags.VARIABLE_TAGS, KogitoTags.INTERNAL_TAG));
}
NodeFactory<?, ?> startNode, currentNode;
if (fromStateExpr != null) {
startNode =
embeddedSubProcess.actionNode(parserContext.newId()).action(ExpressionActionSupplier.of(workflow,
fromStateExpr)
- .withVarNames(DEFAULT_WORKFLOW_VAR,
actionVarName).build()).metaData(SWFConstants.STATE_NAME, state.getName());
+ .withVarNames(modelVar,
actionVarName).build()).metaData(SWFConstants.STATE_NAME, state.getName());
currentNode = connect(startNode,
nodeSupplier.apply(embeddedSubProcess, actionVarName,
actionVarName).metaData(SWFConstants.STATE_NAME, state.getName()));
} else {
- startNode = currentNode = nodeSupplier.apply(embeddedSubProcess,
DEFAULT_WORKFLOW_VAR, actionVarName);
+ startNode = currentNode = nodeSupplier.apply(embeddedSubProcess,
modelVar, actionVarName);
}
if (useData) {
if (resultExpr != null) {
currentNode = connect(currentNode,
embeddedSubProcess.actionNode(parserContext.newId()).action(ExpressionActionSupplier.of(workflow,
resultExpr)
- .withVarNames(variableInfo.getInputVar(),
actionVarName).build()));
+ .withVarNames(actionVarName, actionVarName).build()));
}
if (toStateExpr != null) {
currentNode = connect(currentNode,
embeddedSubProcess.actionNode(parserContext.newId())
- .action(new
CollectorActionSupplier(workflow.getExpressionLang(), toStateExpr,
DEFAULT_WORKFLOW_VAR, actionVarName)));
+ .action(new
CollectorActionSupplier(workflow.getExpressionLang(), toStateExpr, modelVar,
actionVarName)));
} else if (shouldMerge) {
currentNode = connect(currentNode,
embeddedSubProcess.actionNode(parserContext.newId())
- .action(new MergeActionSupplier(actionVarName,
DEFAULT_WORKFLOW_VAR)));
+ .action(new MergeActionSupplier(actionVarName,
modelVar)));
}
}
currentNode.done();
return new MakeNodeResult(startNode, currentNode);
+
}
protected final NodeFactory<?, ?> connect(NodeFactory<?, ?> currentNode,
NodeFactory<?, ?> nodeFactory) {
@@ -546,7 +539,7 @@ public abstract class StateHandler<S extends State> {
String errorMessage = metadata.get("errorMessage");
if (errorMessage != null && !errorMessage.isBlank()) {
NodeFactory<?, ?> errorMessageNode =
- factory.actionNode(parserContext.newId()).action(new
ErrorExpressionActionSupplier(workflow.getExpressionLang(), errorMessage,
SWFConstants.DEFAULT_WORKFLOW_VAR));
+ factory.actionNode(parserContext.newId()).action(new
ErrorExpressionActionSupplier(workflow.getExpressionLang(), errorMessage,
DEFAULT_WORKFLOW_VAR));
connect(errorMessageNode, startNode);
startNode = errorMessageNode;
}
diff --git
a/kogito-serverless-workflow/kogito-serverless-workflow-builder/src/main/java/org/kie/kogito/serverless/workflow/suppliers/SetValueActionSupplier.java
b/kogito-serverless-workflow/kogito-serverless-workflow-builder/src/main/java/org/kie/kogito/serverless/workflow/suppliers/CloneVariableActionSupplier.java
similarity index 77%
copy from
kogito-serverless-workflow/kogito-serverless-workflow-builder/src/main/java/org/kie/kogito/serverless/workflow/suppliers/SetValueActionSupplier.java
copy to
kogito-serverless-workflow/kogito-serverless-workflow-builder/src/main/java/org/kie/kogito/serverless/workflow/suppliers/CloneVariableActionSupplier.java
index 6de50e1648..704ef424a6 100644
---
a/kogito-serverless-workflow/kogito-serverless-workflow-builder/src/main/java/org/kie/kogito/serverless/workflow/suppliers/SetValueActionSupplier.java
+++
b/kogito-serverless-workflow/kogito-serverless-workflow-builder/src/main/java/org/kie/kogito/serverless/workflow/suppliers/CloneVariableActionSupplier.java
@@ -22,24 +22,21 @@ import org.jbpm.compiler.canonical.ExpressionSupplier;
import org.jbpm.compiler.canonical.ProcessMetaData;
import org.jbpm.compiler.canonical.descriptors.ExpressionUtils;
import org.kie.kogito.internal.process.runtime.KogitoNode;
-import org.kie.kogito.serverless.workflow.actions.MergeAction;
-import org.kie.kogito.serverless.workflow.actions.SetValueAction;
+import org.kie.kogito.serverless.workflow.actions.CloneVariableAction;
-import com.fasterxml.jackson.databind.JsonNode;
import com.github.javaparser.ast.expr.Expression;
-public class SetValueActionSupplier extends SetValueAction implements
ExpressionSupplier {
+public class CloneVariableActionSupplier extends CloneVariableAction
implements ExpressionSupplier {
private Expression expression;
- public SetValueActionSupplier(String varName, JsonNode value) {
- super(varName, value);
- this.expression =
ExpressionUtils.getObjectCreationExpr(MergeAction.class, varName, value);
+ public CloneVariableActionSupplier(String sourceName, String targetName) {
+ super(sourceName, targetName);
+ this.expression =
ExpressionUtils.getObjectCreationExpr(CloneVariableAction.class, sourceName,
targetName);
}
@Override
public Expression get(KogitoNode node, ProcessMetaData metadata) {
return expression;
-
}
}
diff --git
a/kogito-serverless-workflow/kogito-serverless-workflow-builder/src/main/java/org/kie/kogito/serverless/workflow/suppliers/SetValueActionSupplier.java
b/kogito-serverless-workflow/kogito-serverless-workflow-builder/src/main/java/org/kie/kogito/serverless/workflow/suppliers/SetValueActionSupplier.java
index 6de50e1648..8b8ab6f843 100644
---
a/kogito-serverless-workflow/kogito-serverless-workflow-builder/src/main/java/org/kie/kogito/serverless/workflow/suppliers/SetValueActionSupplier.java
+++
b/kogito-serverless-workflow/kogito-serverless-workflow-builder/src/main/java/org/kie/kogito/serverless/workflow/suppliers/SetValueActionSupplier.java
@@ -22,7 +22,6 @@ import org.jbpm.compiler.canonical.ExpressionSupplier;
import org.jbpm.compiler.canonical.ProcessMetaData;
import org.jbpm.compiler.canonical.descriptors.ExpressionUtils;
import org.kie.kogito.internal.process.runtime.KogitoNode;
-import org.kie.kogito.serverless.workflow.actions.MergeAction;
import org.kie.kogito.serverless.workflow.actions.SetValueAction;
import com.fasterxml.jackson.databind.JsonNode;
@@ -34,7 +33,7 @@ public class SetValueActionSupplier extends SetValueAction
implements Expression
public SetValueActionSupplier(String varName, JsonNode value) {
super(varName, value);
- this.expression =
ExpressionUtils.getObjectCreationExpr(MergeAction.class, varName, value);
+ this.expression =
ExpressionUtils.getObjectCreationExpr(SetValueAction.class, varName, value);
}
@Override
diff --git
a/kogito-serverless-workflow/kogito-serverless-workflow-executor-core/src/test/java/org/kie/kogito/serverless/workflow/executor/StaticFluentWorkflowApplicationTest.java
b/kogito-serverless-workflow/kogito-serverless-workflow-executor-core/src/test/java/org/kie/kogito/serverless/workflow/executor/StaticFluentWorkflowApplicationTest.java
index be9fab9961..c6e26e0d9d 100644
---
a/kogito-serverless-workflow/kogito-serverless-workflow-executor-core/src/test/java/org/kie/kogito/serverless/workflow/executor/StaticFluentWorkflowApplicationTest.java
+++
b/kogito-serverless-workflow/kogito-serverless-workflow-executor-core/src/test/java/org/kie/kogito/serverless/workflow/executor/StaticFluentWorkflowApplicationTest.java
@@ -185,6 +185,25 @@ public class StaticFluentWorkflowApplicationTest {
}
}
+ @Test
+ void testParallelIsolation() {
+ final String INC = "inc";
+
+ try (StaticWorkflowApplication application =
StaticWorkflowApplication.create()) {
+ Workflow workflow = workflow("ParallelTest").function(expr(INC,
".input=.input+1"))
+ .start(parallel()
+ .atLeast(2)
+ .newBranch().action(call(INC)).endBranch()
+ .newBranch().action(call(INC)).endBranch()
+ .newBranch().action(call(INC)).endBranch())
+ .end().build();
+
+ Process<JsonNodeModel> process = application.process(workflow);
+ JsonNode result = application.execute(process,
Collections.singletonMap("input", 4)).getWorkflowdata();
+ assertThat(result.get("input").asInt()).isEqualTo(5);
+ }
+ }
+
public int duplicate(int number) {
return number * 2;
}
diff --git
a/kogito-serverless-workflow/kogito-serverless-workflow-runtime/src/main/java/org/kie/kogito/serverless/workflow/actions/CloneVariableAction.java
b/kogito-serverless-workflow/kogito-serverless-workflow-runtime/src/main/java/org/kie/kogito/serverless/workflow/actions/CloneVariableAction.java
new file mode 100644
index 0000000000..f236622259
--- /dev/null
+++
b/kogito-serverless-workflow/kogito-serverless-workflow-runtime/src/main/java/org/kie/kogito/serverless/workflow/actions/CloneVariableAction.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.serverless.workflow.actions;
+
+import org.jbpm.process.core.datatype.impl.coverter.CloneHelper;
+import org.jbpm.process.instance.impl.Action;
+import org.kie.kogito.internal.process.runtime.KogitoProcessContext;
+
+public class CloneVariableAction implements Action {
+
+ private final String sourceName;
+ private final String targetName;
+
+ public CloneVariableAction(String sourceName, String targetName) {
+ this.sourceName = sourceName;
+ this.targetName = targetName;
+ }
+
+ @Override
+ public void execute(KogitoProcessContext context) throws Exception {
+ context.setVariable(targetName,
CloneHelper.get().clone(context.getVariable(sourceName)));
+ }
+}
diff --git
a/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/main/resources/parallel.sw.json
b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/main/resources/parallel.sw.json
index fd6ce01c2e..5264f59aac 100644
---
a/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/main/resources/parallel.sw.json
+++
b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/main/resources/parallel.sw.json
@@ -6,26 +6,26 @@
"start": "Parallel",
"functions": [
{
- "name": "concatA",
+ "name": "partA",
"type": "expression",
- "operation": ".result|=.+\"A\""
+ "operation": ".firstPart=\"A\""
},
{
- "name": "concatB",
+ "name": "partB",
"type": "expression",
- "operation": ".result|=.+\"B\""
+ "operation": ".secondPart=\"B\""
},
{
- "name": "concatC",
+ "name": "partC",
"type": "expression",
- "operation": ".result|=.+\"C\""
+ "operation": ".thirdPart=\"C\""
}
],
"states":[
{
"name":"Parallel",
"type":"parallel",
- "branches": [ {"actions": [{"functionRef":"concatA"}] }, {"actions":
[{"functionRef":"concatB"}] },{"actions": [{"functionRef":"concatC"}] }],
+ "branches": [ {"actions": [{"functionRef":"partA"}] }, {"actions":
[{"functionRef":"partB"}] },{"actions": [{"functionRef":"partC"}] }],
"numCompleted" : ".numCompleted",
"completionType" : "atLeast",
"end": {
diff --git
a/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/ParallelStateIT.java
b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/ParallelStateIT.java
index 9bb5e1644f..0c90485b7b 100644
---
a/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/ParallelStateIT.java
+++
b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/ParallelStateIT.java
@@ -18,13 +18,17 @@
*/
package org.kie.kogito.quarkus.workflows;
+import java.util.Map;
+
import org.junit.jupiter.api.Test;
import io.quarkus.test.junit.QuarkusIntegrationTest;
+import io.restassured.common.mapper.TypeRef;
import io.restassured.http.ContentType;
import static io.restassured.RestAssured.given;
-import static org.hamcrest.Matchers.hasLength;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.hamcrest.Matchers.is;
@QuarkusIntegrationTest
class ParallelStateIT {
@@ -38,18 +42,24 @@ class ParallelStateIT {
.post("/parallel")
.then()
.statusCode(201)
- .body("workflowdata.result", hasLength(3));
+ .body("workflowdata.firstPart", is("A"))
+ .body("workflowdata.secondPart", is("B"))
+ .body("workflowdata.thirdPart", is("C"));
}
@Test
void testPartialParallelRest() {
- given()
+ Map<String, Object> result = given()
.contentType(ContentType.JSON)
.accept(ContentType.JSON)
.body("{\"workflowdata\":{\"numCompleted\": 2}}").when()
.post("/parallel")
.then()
.statusCode(201)
- .body("workflowdata.result", hasLength(2));
+ .extract().body().as(new TypeRef<Map<String, Object>>() {
+ });
+ assertThat(result).containsKey("workflowdata");
+ Map<String, Object> workflowData = (Map<String, Object>)
result.get("workflowdata");
+ assertThat(workflowData).hasSize(3);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]