This is an automated email from the ASF dual-hosted git repository. heneveld pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git
commit af91bd77d3a48926e0812af1c8e38e93250c92b5 Author: Alex Heneveld <[email protected]> AuthorDate: Sat Jun 10 09:42:35 2023 +0100 alternative way to compute/store output more efficiently output working, scratch WIP --- .../core/workflow/WorkflowExecutionContext.java | 134 ++++++++++++++++----- .../workflow/WorkflowExpressionResolution.java | 9 +- .../WorkflowStepInstanceExecutionContext.java | 3 +- 3 files changed, 111 insertions(+), 35 deletions(-) diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java index 61d6e59f6f..f7bcf594f4 100644 --- a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java +++ b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java @@ -22,7 +22,6 @@ import com.fasterxml.jackson.annotation.*; import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.annotation.JsonDeserialize; import com.fasterxml.jackson.databind.type.TypeFactory; -import com.fasterxml.jackson.databind.util.Converter; import com.google.common.collect.Iterables; import com.google.common.reflect.TypeToken; import org.apache.brooklyn.api.entity.Entity; @@ -147,8 +146,33 @@ public class WorkflowExecutionContext { Map<String,Object> inputResolved = MutableMap.of(); Object outputDefinition; + /** final output of the workflow, set at end */ Object output; + /* + * Tricks to keep size of persisted/serialized data down: + * + * * this.output set at the end (as a copy) + * NEW: only set if different, on lookup if null, if finished, get from last step + * * step.output set on completion of each step (copying previous if no explicit output) +// * - if never run +// * - if different to previous step, set +// * - if same as previous step, set null +// * - if run before +// * - if old value equals new value, do nothing +// * - if old value different + * - if new value is same as previous step, set null, else set new value + * - & if last run's next step output was null, set it to the old value here + * NEW: only set if different to previous step or not recoverable + * - if different to previous step, set + * - if there was a previous instance of this step: if we are different to previous instance of this step, set + * - if same. has this step + * + * * this.workflowScratch - currently set on context dynamically, copied to oldStepInfo + * NEW 1: set on context and oldStepInfo dynamically but null if same as previous, retrieved looking up previous + * NEW 2: add up all the previous until it repeats + */ + Object lock; Duration timeout; @@ -209,6 +233,7 @@ public class WorkflowExecutionContext { // note: when persisted, this may be omitted and restored from the oldStepInfo map in the Converter Map<String,Object> workflowScratchVariables = MutableMap.of(); + transient Map<String,Object> workflowScratchVariables2222; @JsonInclude(JsonInclude.Include.NON_EMPTY) Map<String,List<Instant>> retryRecords = MutableMap.of(); @@ -371,12 +396,15 @@ public class WorkflowExecutionContext { @JsonIgnore public Map<String, Object> getWorkflowScratchVariables() { - return workflowScratchVariables; - } - @JsonGetter("workflowScratchVariables") - public Map<String, Object> getWorkflowScratchVariablesForPersistence() { - Pair<Map<String, Object>, Set<Integer>> prev = getPreviousStepWorkflowScratchAndBacktrackedSteps(); - if (prev!=null && Objects.equals(prev.getLeft(), workflowScratchVariables)) return null; + if (workflowScratchVariables2222==null) { + Pair<Map<String, Object>, Set<Integer>> prev = getStepWorkflowScratchAndBacktrackedSteps(null); + if (prev != null) workflowScratchVariables2222=prev.getLeft(); + } + if (workflowScratchVariables2222==null) { + workflowScratchVariables2222 = MutableMap.of(); + } + // TODO + //checkEqual(workflowScratchVariables, workflowScratchVariables2222); return workflowScratchVariables; } @@ -670,15 +698,15 @@ public class WorkflowExecutionContext { @JsonIgnore public Object getPreviousStepOutput() { - Pair<Object, Set<Integer>> p = getPreviousStepOutputAndBacktrackedSteps(); + Pair<Object, Set<Integer>> p = getStepOutputAndBacktrackedSteps(null); if (p==null) return null; return p.getLeft(); } @JsonIgnore - public Pair<Object,Set<Integer>> getPreviousStepOutputAndBacktrackedSteps() { - if (lastErrorHandlerOutput!=null) return Pair.of(lastErrorHandlerOutput,null); + Pair<Object,Set<Integer>> getStepOutputAndBacktrackedSteps(Integer stepOrNullForPrevious) { + if (stepOrNullForPrevious==null && lastErrorHandlerOutput!=null) return Pair.of(lastErrorHandlerOutput,null); - Integer prevSI = previousStepIndex; + Integer prevSI = stepOrNullForPrevious==null ? previousStepIndex : stepOrNullForPrevious; Set<Integer> previousSteps = MutableSet.of(); while (prevSI!=null && previousSteps.add(prevSI)) { OldStepRecord last = oldStepInfo.get(prevSI); @@ -691,14 +719,14 @@ public class WorkflowExecutionContext { } @JsonIgnore - public Pair<Map<String,Object>,Set<Integer>> getPreviousStepWorkflowScratchAndBacktrackedSteps() { - Integer prevSI = previousStepIndex; + public Pair<Map<String,Object>,Set<Integer>> getStepWorkflowScratchAndBacktrackedSteps(Integer stepOrNullForPrevious) { + Integer prevSI = stepOrNullForPrevious==null ? previousStepIndex : stepOrNullForPrevious; Set<Integer> previousSteps = MutableSet.of(); while (prevSI!=null && previousSteps.add(prevSI)) { OldStepRecord last = oldStepInfo.get(prevSI); if (last==null) break; if (last.workflowScratch!=null) return Pair.of(last.workflowScratch, previousSteps); - if (last.previous.isEmpty()) break; + if (last.previous==null || last.previous.isEmpty()) break; prevSI = last.previous.iterator().next(); } return null; @@ -708,6 +736,13 @@ public class WorkflowExecutionContext { return output; } + public static void checkEqual(Object o1, Object o2) { + if (!Objects.equals(o1, o2)) { + log.warn("Objects different: " + o1 + " / " + o2); + throw new IllegalStateException("Objects different: " + o1 + " / " + o2); + } + } + public String getName() { return name; } @@ -997,7 +1032,8 @@ public class WorkflowExecutionContext { if (currentStepInstance==null || currentStepInstance.getStepIndex()!=currentStepIndex) { throw new IllegalStateException("Running workflow at unexpected step, "+currentStepIndex+" v "+currentStepInstance); } - currentStepInstance.output = null; + updateOldNextStepOnThisStepStarting(); + updateStepOutput(currentStepInstance, null); currentStepInstance.injectContext(WorkflowExecutionContext.this); log.debug("Replaying workflow '" + name + "', reusing instance " + currentStepInstance + " for step " + workflowStepReference(currentStepIndex) + ")"); @@ -1120,7 +1156,9 @@ public class WorkflowExecutionContext { errorHandled = true; currentStepInstance.next = WorkflowReplayUtils.getNext(result.next, STEP_TARGET_NAME_FOR_END); - if (result.output != null) output = resolve(WorkflowExpressionResolution.WorkflowExpressionStage.STEP_FINISHING_POST_OUTPUT, result.output, Object.class); + if (result.output != null) { + output = resolve(WorkflowExpressionResolution.WorkflowExpressionStage.STEP_FINISHING_POST_OUTPUT, result.output, Object.class); + } moveToNextStep("Handled error in workflow around step " + workflowStepReference(currentStepIndex), result.next==null); @@ -1177,13 +1215,13 @@ public class WorkflowExecutionContext { // record how it ended oldStepInfo.compute(previousStepIndex == null ? STEP_INDEX_FOR_START : previousStepIndex, (index, old) -> { if (old == null) old = new OldStepRecord(); - old.next = MutableSet.<Integer>of(STEP_INDEX_FOR_END).putAll(old.next); + old.next = MutableSet.of(STEP_INDEX_FOR_END).putAll(old.next); old.nextTaskId = null; return old; }); oldStepInfo.compute(STEP_INDEX_FOR_END, (index, old) -> { if (old == null) old = new OldStepRecord(); - old.previous = MutableSet.<Integer>of(previousStepIndex == null ? STEP_INDEX_FOR_START : previousStepIndex).putAll(old.previous); + old.previous = MutableSet.of(previousStepIndex == null ? STEP_INDEX_FOR_START : previousStepIndex).putAll(old.previous); old.previousTaskId = previousStepTaskId; return old; }); @@ -1196,7 +1234,8 @@ public class WorkflowExecutionContext { } OldStepRecord last = oldStepInfo.get(step); if (last != null) { - workflowScratchVariables = last.workflowScratch; + Pair<Map<String, Object>, Set<Integer>> prev = getStepWorkflowScratchAndBacktrackedSteps(step); + if (prev!=null) workflowScratchVariables = prev.getLeft(); previousStepIndex = last.previous==null ? null : last.previous.stream().findFirst().orElse(null); } else { @@ -1209,6 +1248,7 @@ public class WorkflowExecutionContext { } // and ensure not null if (workflowScratchVariables == null) workflowScratchVariables = MutableMap.of(); + workflowScratchVariables2222 = workflowScratchVariables; } private void initializeFromContinuationInstructions(Integer replayFromStep) { @@ -1243,7 +1283,9 @@ public class WorkflowExecutionContext { resetWorkflowContextPreviousAndScratchVarsToStep(currentStepIndex, false); } if (continuationInstructions.customWorkflowScratchVariables!=null) { + getWorkflowScratchVariables(); workflowScratchVariables.putAll(continuationInstructions.customWorkflowScratchVariables); + workflowScratchVariables2222.putAll(continuationInstructions.customWorkflowScratchVariables); } } @@ -1289,7 +1331,7 @@ public class WorkflowExecutionContext { } private Object endWithSuccess() { - WorkflowReplayUtils.updateOnWorkflowSuccess(WorkflowExecutionContext.this, task, output); + WorkflowReplayUtils.updateOnWorkflowSuccess(WorkflowExecutionContext.this, task, getOutput()); persist(); return output; } @@ -1379,32 +1421,37 @@ public class WorkflowExecutionContext { persist(); - BiConsumer<Object,Object> onFinish = (output,overrideNext) -> { + BiConsumer<Object,Object> onFinish = (stepOutputDefinition,overrideNext) -> { currentStepInstance.next = WorkflowReplayUtils.getNext(overrideNext, currentStepInstance, step); - if (output!=null) currentStepInstance.output = resolve(WorkflowExpressionResolution.WorkflowExpressionStage.STEP_FINISHING_POST_OUTPUT, output, Object.class); - - // optimization, clear the value here if we can simply take it from the previous step; - // taking care not to break over loops - Pair<Object, Set<Integer>> prev = getPreviousStepOutputAndBacktrackedSteps(); - if (prev!=null && prev.getRight()!=null && Objects.equals(currentStepInstance.output, prev.getLeft()) && !prev.getRight().contains(currentStepIndex)) { - currentStepInstance.output = null; + if (stepOutputDefinition!=null) { + Object outputResolved = resolve(WorkflowExpressionResolution.WorkflowExpressionStage.STEP_FINISHING_POST_OUTPUT, stepOutputDefinition, Object.class); + updateStepOutput(currentStepInstance, outputResolved); + } + if (currentStepInstance.output2222 != null) { + Pair<Object, Set<Integer>> prev = getStepOutputAndBacktrackedSteps(null); + if (prev != null && Objects.equals(prev.getLeft(), currentStepInstance.output2222)) { + // optimization, clear the value here if we can simply take it from the previous step + currentStepInstance.output2222 = null; + } } }; // now run the step try { Duration duration = step.getTimeout(); + Object newOutput; if (duration!=null) { boolean isEnded = DynamicTasks.queue(t).blockUntilEnded(duration); if (isEnded) { - currentStepInstance.output = t.getUnchecked(); + newOutput = t.getUnchecked(); } else { t.cancel(true); throw new TimeoutException("Timeout after "+duration+": "+t.getDisplayName()); } } else { - currentStepInstance.output = DynamicTasks.queue(t).getUnchecked(); + newOutput = DynamicTasks.queue(t).getUnchecked(); } + updateStepOutput(currentStepInstance, newOutput); // allow output to be customized / overridden onFinish.accept(step.output, null); @@ -1496,6 +1543,31 @@ public class WorkflowExecutionContext { } } + private void updateStepOutput(WorkflowStepInstanceExecutionContext step, Object newOutput) { + step.output2222 = step.output = newOutput; + } + private void updateOldNextStepOnThisStepStarting() { + // at step start, we update the _next_ record to have a copy of our old output and workflow vars + OldStepRecord old = oldStepInfo.get(currentStepInstance.stepIndex); + if (old.next!=null && !old.next.isEmpty()) { + Integer lastNext = old.next.iterator().next(); + OldStepRecord oldNext = oldStepInfo.get(lastNext); + if (oldNext!=null) { + // below will access the _previous_ StepInstanceExecutionContext, as oldStepRecord.context is update at end of step + if (old.context.output2222!=null && oldNext.context.output2222==null) { + // thus below gets the _previous_ output known at this step, saving it in the next + oldNext.context.output2222 = currentStepInstance.output2222; +// Pair<Object, Set<Integer>> prevOutput = getStepOutputAndBacktrackedSteps(currentStepInstance.stepIndex); +// if (prevOutput != null) { +// oldNext.context.output2222 = prevOutput.getLeft(); +// } + } + + // TODO workflow vars + } + } + } + public Maybe<Pair<Integer,Boolean>> getIndexOfStepId(String next) { if (next==null) return Maybe.absent("Null step ID supplied"); Function<WorkflowExecutionContext, Integer> predefined = PREDEFINED_NEXT_TARGETS.get(next.toLowerCase()); @@ -1541,7 +1613,7 @@ public class WorkflowExecutionContext { @Override public WorkflowExecutionContext convert(WorkflowExecutionContext value) { if (value.workflowScratchVariables==null || value.workflowScratchVariables.isEmpty()) { - Pair<Map<String, Object>, Set<Integer>> prev = value.getPreviousStepWorkflowScratchAndBacktrackedSteps(); + Pair<Map<String, Object>, Set<Integer>> prev = value.getStepWorkflowScratchAndBacktrackedSteps(null); if (prev!=null && prev.getLeft()!=null) value.workflowScratchVariables = prev.getLeft(); } // note: no special handling for output; currently we set that as null as we go along diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExpressionResolution.java b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExpressionResolution.java index d9fad13f78..f7612fb87f 100644 --- a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExpressionResolution.java +++ b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExpressionResolution.java @@ -41,6 +41,7 @@ import org.apache.brooklyn.util.exceptions.Exceptions; import org.apache.brooklyn.util.guava.Maybe; import org.apache.brooklyn.util.javalang.Boxing; import org.apache.brooklyn.util.time.Time; +import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -112,7 +113,7 @@ public class WorkflowExpressionResolution { } if ("output".equals(key)) { - if (context.output!=null) return TemplateProcessor.wrapAsTemplateModel(context.output); + if (context.getOutput()!=null) return TemplateProcessor.wrapAsTemplateModel(context.getOutput()); if (context.currentStepInstance!=null && context.currentStepInstance.output!=null) return TemplateProcessor.wrapAsTemplateModel(context.currentStepInstance.output); if (context.getPreviousStepOutput()!=null) return TemplateProcessor.wrapAsTemplateModel(context.getPreviousStepOutput()); return ifNoMatches(); @@ -230,7 +231,7 @@ public class WorkflowExpressionResolution { if ("error".equals(key)) return TemplateProcessor.wrapAsTemplateModel(errorHandlerContext!=null ? errorHandlerContext.error : null); if ("input".equals(key)) return TemplateProcessor.wrapAsTemplateModel(context.input); - if ("output".equals(key)) return TemplateProcessor.wrapAsTemplateModel(context.output); + if ("output".equals(key)) return TemplateProcessor.wrapAsTemplateModel(context.getOutput()); //current_step.yyy and previous_step.yyy (where yyy is any of the above) //step.xxx.yyy ? - where yyy is any of the above and xxx any step id @@ -290,7 +291,9 @@ public class WorkflowExpressionResolution { if ("input".equals(key)) return TemplateProcessor.wrapAsTemplateModel(step.input); if ("output".equals(key)) { - return TemplateProcessor.wrapAsTemplateModel(step.output != null ? step.output : MutableMap.of()); + Pair<Object, Set<Integer>> outputOfStep = context.getStepOutputAndBacktrackedSteps(step.stepIndex); + Object output = (outputOfStep != null && outputOfStep.getLeft() != null) ? outputOfStep.getLeft() : MutableMap.of(); + return TemplateProcessor.wrapAsTemplateModel(output); } return ifNoMatches(); diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepInstanceExecutionContext.java b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepInstanceExecutionContext.java index 1342a1e66e..43e0a308a5 100644 --- a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepInstanceExecutionContext.java +++ b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepInstanceExecutionContext.java @@ -28,7 +28,6 @@ import org.apache.brooklyn.core.entity.internal.ConfigUtilsInternal; import org.apache.brooklyn.core.mgmt.BrooklynTaskTags; import org.apache.brooklyn.util.collections.MutableMap; import org.apache.brooklyn.util.collections.MutableSet; -import org.apache.brooklyn.util.guava.Maybe; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -91,6 +90,7 @@ public class WorkflowStepInstanceExecutionContext { Set<BrooklynTaskTags.WorkflowTaskTag> subWorkflows = MutableSet.of(); Object output; + Object output2222; @JsonInclude(JsonInclude.Include.NON_EMPTY) public Map<String,Object> otherMetadata = MutableMap.of(); @@ -166,6 +166,7 @@ public class WorkflowStepInstanceExecutionContext { } public void setOutput(Object output) { this.output = output; + this.output2222 = output; } @JsonIgnore
