This is an automated email from the ASF dual-hosted git repository. heneveld pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git
commit 1e1ff7f49135ca2c0b964f9c11901977f733c928 Author: Alex Heneveld <[email protected]> AuthorDate: Sun Jun 11 01:06:03 2023 +0100 workflow scratch vars being set in an updateable way --- .../core/workflow/WorkflowExecutionContext.java | 105 +++++++++++++++------ .../core/workflow/steps/CustomWorkflowStep.java | 8 +- .../workflow/steps/flow/ForeachWorkflowStep.java | 4 +- .../steps/variables/ClearVariableWorkflowStep.java | 2 +- .../workflow/steps/variables/LoadWorkflowStep.java | 2 +- .../steps/variables/SetVariableWorkflowStep.java | 4 +- .../workflow/steps/variables/WaitWorkflowStep.java | 2 +- .../brooklyn/core/workflow/WorkflowBasicTest.java | 6 +- .../workflow/WorkflowPersistReplayErrorsTest.java | 2 +- 9 files changed, 92 insertions(+), 43 deletions(-) diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java index f7bcf594f4..41b714f9ce 100644 --- a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java +++ b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java @@ -220,8 +220,10 @@ public class WorkflowExecutionContext { WorkflowStepInstanceExecutionContext context; /** is step replayable */ Boolean replayableFromHere; - /** scratch for last _started_ instance of step */ + /** partial scratch for last _started_ instance of step */ Map<String,Object> workflowScratch; + Map<String,Object> workflowScratch2222start; + Map<String,Object> workflowScratch222UpdatesHere; /** steps that immediately preceded this, updated when _this_ step started, with most recent first */ Set<Integer> previous; /** steps that immediately followed this, updated when _next_ step started, with most recent first */ @@ -234,6 +236,7 @@ public class WorkflowExecutionContext { // note: when persisted, this may be omitted and restored from the oldStepInfo map in the Converter Map<String,Object> workflowScratchVariables = MutableMap.of(); transient Map<String,Object> workflowScratchVariables2222; + transient Map<String,Object> workflowScratchVariablesUpdatedThisStep; @JsonInclude(JsonInclude.Include.NON_EMPTY) Map<String,List<Instant>> retryRecords = MutableMap.of(); @@ -398,14 +401,31 @@ public class WorkflowExecutionContext { public Map<String, Object> getWorkflowScratchVariables() { if (workflowScratchVariables2222==null) { Pair<Map<String, Object>, Set<Integer>> prev = getStepWorkflowScratchAndBacktrackedSteps(null); - if (prev != null) workflowScratchVariables2222=prev.getLeft(); + workflowScratchVariables2222=prev.getLeft(); } - if (workflowScratchVariables2222==null) { - workflowScratchVariables2222 = MutableMap.of(); + checkEqual(workflowScratchVariables, workflowScratchVariables2222); + return MutableMap.copyOf(workflowScratchVariables).asUnmodifiable(); + } + + public Object updateWorkflowScratchVariable(String s, Object v) { + if (workflowScratchVariables2222==null) getWorkflowScratchVariables(); + workflowScratchVariables2222.put(s, v); + Object old = workflowScratchVariables.put(s, v); + if (v==null) workflowScratchVariables.remove(s); + if (v==null) workflowScratchVariables2222.remove(s); + if (workflowScratchVariablesUpdatedThisStep==null) workflowScratchVariablesUpdatedThisStep = MutableMap.of(); + workflowScratchVariablesUpdatedThisStep.put(s, v); + return old; + } + + public void updateWorkflowScratchVariables(Map<String,Object> newValues) { + if (newValues!=null && !newValues.isEmpty()) { + if (workflowScratchVariables2222==null) getWorkflowScratchVariables(); + workflowScratchVariables2222.putAll(newValues); + workflowScratchVariables.putAll(newValues); + if (workflowScratchVariablesUpdatedThisStep==null) workflowScratchVariablesUpdatedThisStep = MutableMap.of(); + workflowScratchVariablesUpdatedThisStep.putAll(newValues); } - // TODO - //checkEqual(workflowScratchVariables, workflowScratchVariables2222); - return workflowScratchVariables; } public Map<String, List<Instant>> getRetryRecords() { @@ -722,14 +742,24 @@ public class WorkflowExecutionContext { public Pair<Map<String,Object>,Set<Integer>> getStepWorkflowScratchAndBacktrackedSteps(Integer stepOrNullForPrevious) { Integer prevSI = stepOrNullForPrevious==null ? previousStepIndex : stepOrNullForPrevious; Set<Integer> previousSteps = MutableSet.of(); + Map<String,Object> result = MutableMap.of(); + boolean includeUpdates = stepOrNullForPrevious==null; // exclude first update if getting at an explicit step while (prevSI!=null && previousSteps.add(prevSI)) { OldStepRecord last = oldStepInfo.get(prevSI); if (last==null) break; - if (last.workflowScratch!=null) return Pair.of(last.workflowScratch, previousSteps); + if (includeUpdates && last.workflowScratch222UpdatesHere!=null) { + result = MutableMap.copyOf(last.workflowScratch222UpdatesHere).add(result); + } + includeUpdates = true; + if (last.workflowScratch2222start!=null) { + result = MutableMap.copyOf(last.workflowScratch2222start).add(result); + result.entrySet().stream().filter(e -> e.getValue()==null).map(Map.Entry::getKey).forEach(result::remove); + break; + } if (last.previous==null || last.previous.isEmpty()) break; prevSI = last.previous.iterator().next(); } - return null; + return Pair.of(result, previousSteps); } public Object getOutput() { @@ -1032,7 +1062,6 @@ public class WorkflowExecutionContext { if (currentStepInstance==null || currentStepInstance.getStepIndex()!=currentStepIndex) { throw new IllegalStateException("Running workflow at unexpected step, "+currentStepIndex+" v "+currentStepInstance); } - updateOldNextStepOnThisStepStarting(); updateStepOutput(currentStepInstance, null); currentStepInstance.injectContext(WorkflowExecutionContext.this); @@ -1234,8 +1263,7 @@ public class WorkflowExecutionContext { } OldStepRecord last = oldStepInfo.get(step); if (last != null) { - Pair<Map<String, Object>, Set<Integer>> prev = getStepWorkflowScratchAndBacktrackedSteps(step); - if (prev!=null) workflowScratchVariables = prev.getLeft(); + workflowScratchVariables = getStepWorkflowScratchAndBacktrackedSteps(step).getLeft(); previousStepIndex = last.previous==null ? null : last.previous.stream().findFirst().orElse(null); } else { @@ -1283,9 +1311,7 @@ public class WorkflowExecutionContext { resetWorkflowContextPreviousAndScratchVarsToStep(currentStepIndex, false); } if (continuationInstructions.customWorkflowScratchVariables!=null) { - getWorkflowScratchVariables(); - workflowScratchVariables.putAll(continuationInstructions.customWorkflowScratchVariables); - workflowScratchVariables2222.putAll(continuationInstructions.customWorkflowScratchVariables); + updateWorkflowScratchVariables(continuationInstructions.customWorkflowScratchVariables); } } @@ -1394,13 +1420,22 @@ public class WorkflowExecutionContext { t = step.newTask(currentStepInstance); } + updateOldNextStepOnThisStepStarting(); + // about to run -- checkpoint noting current and previous steps, and updating replayable from info OldStepRecord currentStepRecord = oldStepInfo.compute(currentStepIndex, (index, old) -> { if (old == null) old = new OldStepRecord(); old.countStarted++; - if (!workflowScratchVariables.isEmpty()) + + // no longer necessary because can recompute, and any specials needed at start will have been applied in one of the updateOldNextStepOnThisStepStarting + if (!workflowScratchVariables.isEmpty()) { old.workflowScratch = MutableMap.copyOf(workflowScratchVariables); - else old.workflowScratch = null; + } else { + old.workflowScratch = null; + } + + old.workflowScratch222UpdatesHere = null; + old.previous = MutableSet.<Integer>of(previousStepIndex == null ? STEP_INDEX_FOR_START : previousStepIndex).putAll(old.previous); old.previousTaskId = previousStepTaskId; old.nextTaskId = null; @@ -1434,6 +1469,18 @@ public class WorkflowExecutionContext { currentStepInstance.output2222 = null; } } + if (workflowScratchVariablesUpdatedThisStep!=null && !workflowScratchVariablesUpdatedThisStep.isEmpty()) { + currentStepRecord.workflowScratch222UpdatesHere = workflowScratchVariablesUpdatedThisStep; + } + if (currentStepRecord.workflowScratch2222start != null) { + // if we are repeating, check if we need to keep what we were repeating + Pair<Map<String, Object>, Set<Integer>> prev = getStepWorkflowScratchAndBacktrackedSteps(null); + if (prev!=null && !prev.getRight().contains(currentStepIndex) && Objects.equals(prev.getLeft(), currentStepRecord.workflowScratch2222start)){ + currentStepRecord.workflowScratch2222start = null; + } + } + + workflowScratchVariablesUpdatedThisStep = null; }; // now run the step @@ -1549,21 +1596,25 @@ public class WorkflowExecutionContext { private void updateOldNextStepOnThisStepStarting() { // at step start, we update the _next_ record to have a copy of our old output and workflow vars OldStepRecord old = oldStepInfo.get(currentStepInstance.stepIndex); - if (old.next!=null && !old.next.isEmpty()) { + if (old!=null && old.next!=null && !old.next.isEmpty()) { Integer lastNext = old.next.iterator().next(); OldStepRecord oldNext = oldStepInfo.get(lastNext); - if (oldNext!=null) { - // below will access the _previous_ StepInstanceExecutionContext, as oldStepRecord.context is update at end of step - if (old.context.output2222!=null && oldNext.context.output2222==null) { + if (oldNext!=null && oldNext.context!=null) { + // if oldNext has no context then we never ran it, so we aren't repeating, we're replaying + if (oldNext.context.output2222==null) { + // below will access the _previous_ StepInstanceExecutionContext, as oldStepRecord.context is update at end of step // thus below gets the _previous_ output known at this step, saving it in the next - oldNext.context.output2222 = currentStepInstance.output2222; + oldNext.context.output2222 = old.context.output2222; // Pair<Object, Set<Integer>> prevOutput = getStepOutputAndBacktrackedSteps(currentStepInstance.stepIndex); // if (prevOutput != null) { // oldNext.context.output2222 = prevOutput.getLeft(); // } } - // TODO workflow vars + oldNext.workflowScratch2222start = + getWorkflowScratchVariables() + //getStepWorkflowScratchAndBacktrackedSteps(lastNext).getLeft() + ; } } } @@ -1612,12 +1663,10 @@ public class WorkflowExecutionContext { public static class Converter implements com.fasterxml.jackson.databind.util.Converter<WorkflowExecutionContext,WorkflowExecutionContext> { @Override public WorkflowExecutionContext convert(WorkflowExecutionContext value) { - if (value.workflowScratchVariables==null || value.workflowScratchVariables.isEmpty()) { - Pair<Map<String, Object>, Set<Integer>> prev = value.getStepWorkflowScratchAndBacktrackedSteps(null); - if (prev!=null && prev.getLeft()!=null) value.workflowScratchVariables = prev.getLeft(); + if (value.workflowScratchVariables2222==null || value.workflowScratchVariables2222.isEmpty()) { + value.workflowScratchVariables2222 = value.getStepWorkflowScratchAndBacktrackedSteps(null).getLeft(); } - // note: no special handling for output; currently we set that as null as we go along - + // note: no special handling needed for output; it is derived from the last non-null step output return value; } diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/CustomWorkflowStep.java b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/CustomWorkflowStep.java index d2580ea915..7eb66005e6 100644 --- a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/CustomWorkflowStep.java +++ b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/CustomWorkflowStep.java @@ -408,7 +408,7 @@ public class CustomWorkflowStep extends WorkflowStepDefinition implements Workfl WorkflowExecutionContext wc = p.getLeft(); if (wc.getCurrentStepIndex()==null || wc.getCurrentStepIndex()==WorkflowExecutionContext.STEP_INDEX_FOR_START) { // initialize to last if it hasn't started - wc.getWorkflowScratchVariables().putAll(reducingV); + wc.updateWorkflowScratchVariables(reducingV); } DynamicTasks.queue(p.getRight()).getUnchecked(); @@ -428,7 +428,7 @@ public class CustomWorkflowStep extends WorkflowStepDefinition implements Workfl returnValue = !wasList ? Iterables.getOnlyElement(result) : result; } else { context.setOutput(reducingV); - context.getWorkflowExectionContext().getWorkflowScratchVariables().putAll(reducingV); + context.getWorkflowExectionContext().updateWorkflowScratchVariables(reducingV); returnValue = reducingV; } @@ -543,7 +543,7 @@ public class CustomWorkflowStep extends WorkflowStepDefinition implements Workfl String tivn = context.resolve(WorkflowExpressionResolution.WorkflowExpressionStage.STEP_INPUT, target_index_var_name, String.class); - if (targetIndexOrNull!=null) nestedWorkflowContext.getWorkflowScratchVariables().put(tivn==null ? TARGET_INDEX_VAR_NAME_DEFAULT : tivn, targetIndexOrNull); + if (targetIndexOrNull!=null) nestedWorkflowContext.updateWorkflowScratchVariable(tivn==null ? TARGET_INDEX_VAR_NAME_DEFAULT : tivn, targetIndexOrNull); initializeSubWorkflowForTarget(context, target, nestedWorkflowContext); return nestedWorkflowContext; @@ -551,7 +551,7 @@ public class CustomWorkflowStep extends WorkflowStepDefinition implements Workfl protected void initializeSubWorkflowForTarget(WorkflowStepInstanceExecutionContext context, Object target, WorkflowExecutionContext nestedWorkflowContext) { String tvn = context.resolve(WorkflowExpressionResolution.WorkflowExpressionStage.STEP_INPUT, target_var_name, String.class); - nestedWorkflowContext.getWorkflowScratchVariables().put(tvn==null ? TARGET_VAR_NAME_DEFAULT : tvn, target); + nestedWorkflowContext.updateWorkflowScratchVariable(tvn==null ? TARGET_VAR_NAME_DEFAULT : tvn, target); } /** Returns a top-level workflow running the workflow defined here */ diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/flow/ForeachWorkflowStep.java b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/flow/ForeachWorkflowStep.java index 4d9879b72b..3c179bc19a 100644 --- a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/flow/ForeachWorkflowStep.java +++ b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/flow/ForeachWorkflowStep.java @@ -65,10 +65,10 @@ public class ForeachWorkflowStep extends CustomWorkflowStep { if (tvn.startsWith("{") && tvn.endsWith("}")) { String[] spreadVars = tvn.substring(1, tvn.length() - 1).split(","); if (!(target instanceof Map)) throw new IllegalStateException("Spread vars indicated in foreach but target is not a map"); - nestedWorkflowContext.getWorkflowScratchVariables().put(TARGET_VAR_NAME_DEFAULT, target); + nestedWorkflowContext.updateWorkflowScratchVariable(TARGET_VAR_NAME_DEFAULT, target); for (String spreadVar: spreadVars) { String svt = spreadVar.trim(); - nestedWorkflowContext.getWorkflowScratchVariables().put(svt, ((Map)target).get(svt)); + nestedWorkflowContext.updateWorkflowScratchVariable(svt, ((Map)target).get(svt)); } return; } diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/variables/ClearVariableWorkflowStep.java b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/variables/ClearVariableWorkflowStep.java index 4ae6d21c0a..3623c472de 100644 --- a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/variables/ClearVariableWorkflowStep.java +++ b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/variables/ClearVariableWorkflowStep.java @@ -43,7 +43,7 @@ public class ClearVariableWorkflowStep extends WorkflowStepDefinition { if (variable ==null) throw new IllegalArgumentException("Variable name is required"); String name = context.resolve(WorkflowExpressionResolution.WorkflowExpressionStage.STEP_INPUT, variable.name, String.class); if (Strings.isBlank(name)) throw new IllegalArgumentException("Variable name is required"); - context.getWorkflowExectionContext().getWorkflowScratchVariables().remove(name); + context.getWorkflowExectionContext().updateWorkflowScratchVariable(name, null); return context.getPreviousStepOutput(); } diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/variables/LoadWorkflowStep.java b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/variables/LoadWorkflowStep.java index 197aebefce..da68001c05 100644 --- a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/variables/LoadWorkflowStep.java +++ b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/variables/LoadWorkflowStep.java @@ -89,7 +89,7 @@ public class LoadWorkflowStep extends WorkflowStepDefinition { Object resolvedValue = new SetVariableWorkflowStep.ConfigurableInterpolationEvaluation(context, type, data, context.getInputOrDefault(INTERPOLATION_MODE), context.getInputOrDefault(INTERPOLATION_ERRORS)).evaluate(); - context.getWorkflowExectionContext().getWorkflowScratchVariables().put(name, resolvedValue); + context.getWorkflowExectionContext().updateWorkflowScratchVariable(name, resolvedValue); context.noteOtherMetadata("Loaded", ByteSizeStrings.java().makeSizeString(data.getBytes().length)+" from "+url+" into "+variable); return context.getPreviousStepOutput(); diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/variables/SetVariableWorkflowStep.java b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/variables/SetVariableWorkflowStep.java index 6531ba0cec..7da45ca46f 100644 --- a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/variables/SetVariableWorkflowStep.java +++ b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/variables/SetVariableWorkflowStep.java @@ -143,9 +143,9 @@ public class SetVariableWorkflowStep extends WorkflowStepDefinition { throw new IllegalArgumentException("Invalid list index " + listIndex); } oldValue = l.set(listIndex, resolvedValue); - context.getWorkflowExectionContext().getWorkflowScratchVariables().put(listName, l); + context.getWorkflowExectionContext().updateWorkflowScratchVariable(listName, l); } else { - oldValue = context.getWorkflowExectionContext().getWorkflowScratchVariables().put(name, resolvedValue); + oldValue = context.getWorkflowExectionContext().updateWorkflowScratchVariable(name, resolvedValue); } return oldValue; } diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/variables/WaitWorkflowStep.java b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/variables/WaitWorkflowStep.java index db5be0b36e..3b597da68f 100644 --- a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/variables/WaitWorkflowStep.java +++ b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/variables/WaitWorkflowStep.java @@ -82,7 +82,7 @@ public class WaitWorkflowStep extends WorkflowStepDefinition { log.debug("Wait resolved after "+duration+", "+input.get(unresolvedValue)+" is: "+resolvedValue); if (name!=null) { - Object oldValue = context.getWorkflowExectionContext().getWorkflowScratchVariables().put(name, resolvedValue); + Object oldValue = context.getWorkflowExectionContext().updateWorkflowScratchVariable(name, resolvedValue); if (oldValue!=null) context.noteOtherMetadata("Previous value", oldValue); context.noteOtherMetadata("Value set", resolvedValue); return context.getPreviousStepOutput(); diff --git a/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowBasicTest.java b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowBasicTest.java index 128375e93b..b9bc866d9d 100644 --- a/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowBasicTest.java +++ b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowBasicTest.java @@ -344,12 +344,12 @@ public class WorkflowBasicTest extends BrooklynMgmtUnitTestSupport { @Test public void testWorkflowResolutionScratchVariable() { - doTestOfWorkflowVariable(context -> context.getWorkflowExectionContext().getWorkflowScratchVariables().put("foo", "bar"), "${foo}", "bar"); + doTestOfWorkflowVariable(context -> context.getWorkflowExectionContext().updateWorkflowScratchVariable("foo", "bar"), "${foo}", "bar"); } @Test public void testWorkflowResolutionScratchVariableCoerced() { - doTestOfTypedWorkflowVariable(context -> context.getWorkflowExectionContext().getWorkflowScratchVariables().put("foo", "7"), "${foo}", "integer", 7); + doTestOfTypedWorkflowVariable(context -> context.getWorkflowExectionContext().updateWorkflowScratchVariable("foo", "7"), "${foo}", "integer", 7); } @Test @@ -359,7 +359,7 @@ public class WorkflowBasicTest extends BrooklynMgmtUnitTestSupport { @Test public void testWorkflowResolutionMore() { - doTestOfWorkflowVariable(context -> context.getWorkflowExectionContext().getWorkflowScratchVariables().put("foo", MutableList.of("baz", "bar")), "${foo[1]}", "bar"); + doTestOfWorkflowVariable(context -> context.getWorkflowExectionContext().updateWorkflowScratchVariable("foo", MutableList.of("baz", "bar")), "${foo[1]}", "bar"); doTestOfWorkflowVariable(context -> context.getEntity().config().set(ConfigKeys.newConfigKey(Object.class, "foo"), MutableMap.of("bar", "baz")), "${entity.config.foo.bar}", "baz"); } diff --git a/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowPersistReplayErrorsTest.java b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowPersistReplayErrorsTest.java index b788413d4a..d95b053402 100644 --- a/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowPersistReplayErrorsTest.java +++ b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowPersistReplayErrorsTest.java @@ -873,7 +873,7 @@ public class WorkflowPersistReplayErrorsTest extends RebindTestFixture<BasicAppl Asserts.assertEquals(wp.get().getWorkflows(app).keySet(), MutableSet.of(w1.getWorkflowId(), w2.getWorkflowId(), w3.getWorkflowId(), w4.getWorkflowId())); // should replace the one above } - @Test(groups="Integration") + @Test(groups="Integration") // very slow public void testRetentionManyWaysIncludingDisabled() throws Exception { app = mgmt().getEntityManager().createEntity(EntitySpec.create(BasicApplication.class));
