This is an automated email from the ASF dual-hosted git repository.

heneveld pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git

commit af91bd77d3a48926e0812af1c8e38e93250c92b5
Author: Alex Heneveld <[email protected]>
AuthorDate: Sat Jun 10 09:42:35 2023 +0100

    alternative way to compute/store output more efficiently
    
    output working, scratch WIP
---
 .../core/workflow/WorkflowExecutionContext.java    | 134 ++++++++++++++++-----
 .../workflow/WorkflowExpressionResolution.java     |   9 +-
 .../WorkflowStepInstanceExecutionContext.java      |   3 +-
 3 files changed, 111 insertions(+), 35 deletions(-)

diff --git 
a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java
 
b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java
index 61d6e59f6f..f7bcf594f4 100644
--- 
a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java
+++ 
b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java
@@ -22,7 +22,6 @@ import com.fasterxml.jackson.annotation.*;
 import com.fasterxml.jackson.databind.JavaType;
 import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
 import com.fasterxml.jackson.databind.type.TypeFactory;
-import com.fasterxml.jackson.databind.util.Converter;
 import com.google.common.collect.Iterables;
 import com.google.common.reflect.TypeToken;
 import org.apache.brooklyn.api.entity.Entity;
@@ -147,8 +146,33 @@ public class WorkflowExecutionContext {
     Map<String,Object> inputResolved = MutableMap.of();
 
     Object outputDefinition;
+    /** final output of the workflow, set at end */
     Object output;
 
+    /*
+     * Tricks to keep size of persisted/serialized data down:
+     *
+     * * this.output set at the end (as a copy)
+     *   NEW: only set if different, on lookup if null, if finished, get from 
last step
+     * * step.output set on completion of each step (copying previous if no 
explicit output)
+//     *   - if never run
+//     *     - if different to previous step, set
+//     *     - if same as previous step, set null
+//     *   - if run before
+//     *     - if old value equals new value, do nothing
+//     *     - if old value different
+     *       - if new value is same as previous step, set null, else set new 
value
+     *       - & if last run's next step output was null, set it to the old 
value here
+     *   NEW: only set if different to previous step or not recoverable
+     *   - if different to previous step, set
+     *   - if there was a previous instance of this step: if we are different 
to previous instance of this step, set
+     *   - if same. has this step
+     *
+     * * this.workflowScratch - currently set on context dynamically, copied 
to oldStepInfo
+     *   NEW 1: set on context and oldStepInfo dynamically but null if same as 
previous, retrieved looking up previous
+     *   NEW 2: add up all the previous until it repeats
+     */
+
     Object lock;
 
     Duration timeout;
@@ -209,6 +233,7 @@ public class WorkflowExecutionContext {
 
     // note: when persisted, this may be omitted and restored from the 
oldStepInfo map in the Converter
     Map<String,Object> workflowScratchVariables = MutableMap.of();
+    transient Map<String,Object> workflowScratchVariables2222;
 
     @JsonInclude(JsonInclude.Include.NON_EMPTY)
     Map<String,List<Instant>> retryRecords = MutableMap.of();
@@ -371,12 +396,15 @@ public class WorkflowExecutionContext {
 
     @JsonIgnore
     public Map<String, Object> getWorkflowScratchVariables() {
-        return workflowScratchVariables;
-    }
-    @JsonGetter("workflowScratchVariables")
-    public Map<String, Object> getWorkflowScratchVariablesForPersistence() {
-        Pair<Map<String, Object>, Set<Integer>> prev = 
getPreviousStepWorkflowScratchAndBacktrackedSteps();
-        if (prev!=null && Objects.equals(prev.getLeft(), 
workflowScratchVariables)) return null;
+        if (workflowScratchVariables2222==null) {
+            Pair<Map<String, Object>, Set<Integer>> prev = 
getStepWorkflowScratchAndBacktrackedSteps(null);
+            if (prev != null) workflowScratchVariables2222=prev.getLeft();
+        }
+        if (workflowScratchVariables2222==null) {
+            workflowScratchVariables2222 = MutableMap.of();
+        }
+        // TODO
+        //checkEqual(workflowScratchVariables, workflowScratchVariables2222);
         return workflowScratchVariables;
     }
 
@@ -670,15 +698,15 @@ public class WorkflowExecutionContext {
 
     @JsonIgnore
     public Object getPreviousStepOutput() {
-        Pair<Object, Set<Integer>> p = 
getPreviousStepOutputAndBacktrackedSteps();
+        Pair<Object, Set<Integer>> p = getStepOutputAndBacktrackedSteps(null);
         if (p==null) return null;
         return p.getLeft();
     }
     @JsonIgnore
-    public Pair<Object,Set<Integer>> 
getPreviousStepOutputAndBacktrackedSteps() {
-        if (lastErrorHandlerOutput!=null) return 
Pair.of(lastErrorHandlerOutput,null);
+    Pair<Object,Set<Integer>> getStepOutputAndBacktrackedSteps(Integer 
stepOrNullForPrevious) {
+        if (stepOrNullForPrevious==null && lastErrorHandlerOutput!=null) 
return Pair.of(lastErrorHandlerOutput,null);
 
-        Integer prevSI = previousStepIndex;
+        Integer prevSI = stepOrNullForPrevious==null ? previousStepIndex : 
stepOrNullForPrevious;
         Set<Integer> previousSteps = MutableSet.of();
         while (prevSI!=null && previousSteps.add(prevSI)) {
             OldStepRecord last = oldStepInfo.get(prevSI);
@@ -691,14 +719,14 @@ public class WorkflowExecutionContext {
     }
 
     @JsonIgnore
-    public Pair<Map<String,Object>,Set<Integer>> 
getPreviousStepWorkflowScratchAndBacktrackedSteps() {
-        Integer prevSI = previousStepIndex;
+    public Pair<Map<String,Object>,Set<Integer>> 
getStepWorkflowScratchAndBacktrackedSteps(Integer stepOrNullForPrevious) {
+        Integer prevSI = stepOrNullForPrevious==null ? previousStepIndex : 
stepOrNullForPrevious;
         Set<Integer> previousSteps = MutableSet.of();
         while (prevSI!=null && previousSteps.add(prevSI)) {
             OldStepRecord last = oldStepInfo.get(prevSI);
             if (last==null) break;
             if (last.workflowScratch!=null) return 
Pair.of(last.workflowScratch, previousSteps);
-            if (last.previous.isEmpty()) break;
+            if (last.previous==null || last.previous.isEmpty()) break;
             prevSI = last.previous.iterator().next();
         }
         return null;
@@ -708,6 +736,13 @@ public class WorkflowExecutionContext {
         return output;
     }
 
+    public static void checkEqual(Object o1, Object o2) {
+        if (!Objects.equals(o1, o2)) {
+            log.warn("Objects different: " + o1 + " / " + o2);
+            throw new IllegalStateException("Objects different: " + o1 + " / " 
+ o2);
+        }
+    }
+
     public String getName() {
         return name;
     }
@@ -997,7 +1032,8 @@ public class WorkflowExecutionContext {
                                 if (currentStepInstance==null || 
currentStepInstance.getStepIndex()!=currentStepIndex) {
                                     throw new IllegalStateException("Running 
workflow at unexpected step, "+currentStepIndex+" v "+currentStepInstance);
                                 }
-                                currentStepInstance.output = null;
+                                updateOldNextStepOnThisStepStarting();
+                                updateStepOutput(currentStepInstance, null);
                                 
currentStepInstance.injectContext(WorkflowExecutionContext.this);
 
                                 log.debug("Replaying workflow '" + name + "', 
reusing instance " + currentStepInstance + " for step " + 
workflowStepReference(currentStepIndex) + ")");
@@ -1120,7 +1156,9 @@ public class WorkflowExecutionContext {
                         errorHandled = true;
 
                         currentStepInstance.next = 
WorkflowReplayUtils.getNext(result.next, STEP_TARGET_NAME_FOR_END);
-                        if (result.output != null) output = 
resolve(WorkflowExpressionResolution.WorkflowExpressionStage.STEP_FINISHING_POST_OUTPUT,
 result.output, Object.class);
+                        if (result.output != null) {
+                            output = 
resolve(WorkflowExpressionResolution.WorkflowExpressionStage.STEP_FINISHING_POST_OUTPUT,
 result.output, Object.class);
+                        }
 
                         moveToNextStep("Handled error in workflow around step 
" + workflowStepReference(currentStepIndex), result.next==null);
 
@@ -1177,13 +1215,13 @@ public class WorkflowExecutionContext {
             // record how it ended
             oldStepInfo.compute(previousStepIndex == null ? 
STEP_INDEX_FOR_START : previousStepIndex, (index, old) -> {
                 if (old == null) old = new OldStepRecord();
-                old.next = 
MutableSet.<Integer>of(STEP_INDEX_FOR_END).putAll(old.next);
+                old.next = MutableSet.of(STEP_INDEX_FOR_END).putAll(old.next);
                 old.nextTaskId = null;
                 return old;
             });
             oldStepInfo.compute(STEP_INDEX_FOR_END, (index, old) -> {
                 if (old == null) old = new OldStepRecord();
-                old.previous = MutableSet.<Integer>of(previousStepIndex == 
null ? STEP_INDEX_FOR_START : previousStepIndex).putAll(old.previous);
+                old.previous = MutableSet.of(previousStepIndex == null ? 
STEP_INDEX_FOR_START : previousStepIndex).putAll(old.previous);
                 old.previousTaskId = previousStepTaskId;
                 return old;
             });
@@ -1196,7 +1234,8 @@ public class WorkflowExecutionContext {
             }
             OldStepRecord last = oldStepInfo.get(step);
             if (last != null) {
-                workflowScratchVariables = last.workflowScratch;
+                Pair<Map<String, Object>, Set<Integer>> prev = 
getStepWorkflowScratchAndBacktrackedSteps(step);
+                if (prev!=null) workflowScratchVariables = prev.getLeft();
                 previousStepIndex = last.previous==null ? null : 
last.previous.stream().findFirst().orElse(null);
 
             } else {
@@ -1209,6 +1248,7 @@ public class WorkflowExecutionContext {
             }
             // and ensure not null
             if (workflowScratchVariables == null) workflowScratchVariables = 
MutableMap.of();
+            workflowScratchVariables2222 = workflowScratchVariables;
         }
 
         private void initializeFromContinuationInstructions(Integer 
replayFromStep) {
@@ -1243,7 +1283,9 @@ public class WorkflowExecutionContext {
                 
resetWorkflowContextPreviousAndScratchVarsToStep(currentStepIndex, false);
             }
             if (continuationInstructions.customWorkflowScratchVariables!=null) 
{
+                getWorkflowScratchVariables();
                 
workflowScratchVariables.putAll(continuationInstructions.customWorkflowScratchVariables);
+                
workflowScratchVariables2222.putAll(continuationInstructions.customWorkflowScratchVariables);
             }
 
         }
@@ -1289,7 +1331,7 @@ public class WorkflowExecutionContext {
         }
 
         private Object endWithSuccess() {
-            
WorkflowReplayUtils.updateOnWorkflowSuccess(WorkflowExecutionContext.this, 
task, output);
+            
WorkflowReplayUtils.updateOnWorkflowSuccess(WorkflowExecutionContext.this, 
task, getOutput());
             persist();
             return output;
         }
@@ -1379,32 +1421,37 @@ public class WorkflowExecutionContext {
 
             persist();
 
-            BiConsumer<Object,Object> onFinish = (output,overrideNext) -> {
+            BiConsumer<Object,Object> onFinish = 
(stepOutputDefinition,overrideNext) -> {
                 currentStepInstance.next = 
WorkflowReplayUtils.getNext(overrideNext, currentStepInstance, step);
-                if (output!=null) currentStepInstance.output = 
resolve(WorkflowExpressionResolution.WorkflowExpressionStage.STEP_FINISHING_POST_OUTPUT,
 output, Object.class);
-
-                // optimization, clear the value here if we can simply take it 
from the previous step;
-                // taking care not to break over loops
-                Pair<Object, Set<Integer>> prev = 
getPreviousStepOutputAndBacktrackedSteps();
-                if (prev!=null && prev.getRight()!=null && 
Objects.equals(currentStepInstance.output, prev.getLeft()) && 
!prev.getRight().contains(currentStepIndex)) {
-                    currentStepInstance.output = null;
+                if (stepOutputDefinition!=null) {
+                    Object outputResolved = 
resolve(WorkflowExpressionResolution.WorkflowExpressionStage.STEP_FINISHING_POST_OUTPUT,
 stepOutputDefinition, Object.class);
+                    updateStepOutput(currentStepInstance, outputResolved);
+                }
+                if (currentStepInstance.output2222 != null) {
+                    Pair<Object, Set<Integer>> prev = 
getStepOutputAndBacktrackedSteps(null);
+                    if (prev != null && Objects.equals(prev.getLeft(), 
currentStepInstance.output2222)) {
+                        // optimization, clear the value here if we can simply 
take it from the previous step
+                        currentStepInstance.output2222 = null;
+                    }
                 }
             };
 
             // now run the step
             try {
                 Duration duration = step.getTimeout();
+                Object newOutput;
                 if (duration!=null) {
                     boolean isEnded = 
DynamicTasks.queue(t).blockUntilEnded(duration);
                     if (isEnded) {
-                        currentStepInstance.output = t.getUnchecked();
+                        newOutput = t.getUnchecked();
                     } else {
                         t.cancel(true);
                         throw new TimeoutException("Timeout after 
"+duration+": "+t.getDisplayName());
                     }
                 } else {
-                    currentStepInstance.output = 
DynamicTasks.queue(t).getUnchecked();
+                    newOutput = DynamicTasks.queue(t).getUnchecked();
                 }
+                updateStepOutput(currentStepInstance, newOutput);
 
                 // allow output to be customized / overridden
                 onFinish.accept(step.output, null);
@@ -1496,6 +1543,31 @@ public class WorkflowExecutionContext {
         }
     }
 
+    private void updateStepOutput(WorkflowStepInstanceExecutionContext step, 
Object newOutput) {
+        step.output2222 = step.output = newOutput;
+    }
+    private void updateOldNextStepOnThisStepStarting() {
+        // at step start, we update the _next_ record to have a copy of our 
old output and workflow vars
+        OldStepRecord old = oldStepInfo.get(currentStepInstance.stepIndex);
+        if (old.next!=null && !old.next.isEmpty()) {
+            Integer lastNext = old.next.iterator().next();
+            OldStepRecord oldNext = oldStepInfo.get(lastNext);
+            if (oldNext!=null) {
+                // below will access the _previous_ 
StepInstanceExecutionContext, as oldStepRecord.context is update at end of step
+                if (old.context.output2222!=null && 
oldNext.context.output2222==null) {
+                    // thus below gets the _previous_ output known at this 
step, saving it in the next
+                    oldNext.context.output2222 = 
currentStepInstance.output2222;
+//                    Pair<Object, Set<Integer>> prevOutput = 
getStepOutputAndBacktrackedSteps(currentStepInstance.stepIndex);
+//                    if (prevOutput != null) {
+//                        oldNext.context.output2222 = prevOutput.getLeft();
+//                    }
+                }
+
+                // TODO workflow vars
+            }
+        }
+    }
+
     public Maybe<Pair<Integer,Boolean>> getIndexOfStepId(String next) {
         if (next==null) return Maybe.absent("Null step ID supplied");
         Function<WorkflowExecutionContext, Integer> predefined = 
PREDEFINED_NEXT_TARGETS.get(next.toLowerCase());
@@ -1541,7 +1613,7 @@ public class WorkflowExecutionContext {
         @Override
         public WorkflowExecutionContext convert(WorkflowExecutionContext 
value) {
             if (value.workflowScratchVariables==null || 
value.workflowScratchVariables.isEmpty()) {
-                Pair<Map<String, Object>, Set<Integer>> prev = 
value.getPreviousStepWorkflowScratchAndBacktrackedSteps();
+                Pair<Map<String, Object>, Set<Integer>> prev = 
value.getStepWorkflowScratchAndBacktrackedSteps(null);
                 if (prev!=null && prev.getLeft()!=null) 
value.workflowScratchVariables = prev.getLeft();
             }
             // note: no special handling for output; currently we set that as 
null as we go along
diff --git 
a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExpressionResolution.java
 
b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExpressionResolution.java
index d9fad13f78..f7612fb87f 100644
--- 
a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExpressionResolution.java
+++ 
b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExpressionResolution.java
@@ -41,6 +41,7 @@ import org.apache.brooklyn.util.exceptions.Exceptions;
 import org.apache.brooklyn.util.guava.Maybe;
 import org.apache.brooklyn.util.javalang.Boxing;
 import org.apache.brooklyn.util.time.Time;
+import org.apache.commons.lang3.tuple.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -112,7 +113,7 @@ public class WorkflowExpressionResolution {
             }
 
             if ("output".equals(key)) {
-                if (context.output!=null) return 
TemplateProcessor.wrapAsTemplateModel(context.output);
+                if (context.getOutput()!=null) return 
TemplateProcessor.wrapAsTemplateModel(context.getOutput());
                 if (context.currentStepInstance!=null && 
context.currentStepInstance.output!=null) return 
TemplateProcessor.wrapAsTemplateModel(context.currentStepInstance.output);
                 if (context.getPreviousStepOutput()!=null) return 
TemplateProcessor.wrapAsTemplateModel(context.getPreviousStepOutput());
                 return ifNoMatches();
@@ -230,7 +231,7 @@ public class WorkflowExpressionResolution {
             if ("error".equals(key)) return 
TemplateProcessor.wrapAsTemplateModel(errorHandlerContext!=null ? 
errorHandlerContext.error : null);
 
             if ("input".equals(key)) return 
TemplateProcessor.wrapAsTemplateModel(context.input);
-            if ("output".equals(key)) return 
TemplateProcessor.wrapAsTemplateModel(context.output);
+            if ("output".equals(key)) return 
TemplateProcessor.wrapAsTemplateModel(context.getOutput());
 
             //current_step.yyy and previous_step.yyy (where yyy is any of the 
above)
             //step.xxx.yyy ? - where yyy is any of the above and xxx any step 
id
@@ -290,7 +291,9 @@ public class WorkflowExpressionResolution {
 
             if ("input".equals(key)) return 
TemplateProcessor.wrapAsTemplateModel(step.input);
             if ("output".equals(key)) {
-                return TemplateProcessor.wrapAsTemplateModel(step.output != 
null ? step.output : MutableMap.of());
+                Pair<Object, Set<Integer>> outputOfStep = 
context.getStepOutputAndBacktrackedSteps(step.stepIndex);
+                Object output = (outputOfStep != null && 
outputOfStep.getLeft() != null) ? outputOfStep.getLeft() : MutableMap.of();
+                return TemplateProcessor.wrapAsTemplateModel(output);
             }
 
             return ifNoMatches();
diff --git 
a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepInstanceExecutionContext.java
 
b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepInstanceExecutionContext.java
index 1342a1e66e..43e0a308a5 100644
--- 
a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepInstanceExecutionContext.java
+++ 
b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepInstanceExecutionContext.java
@@ -28,7 +28,6 @@ import 
org.apache.brooklyn.core.entity.internal.ConfigUtilsInternal;
 import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
 import org.apache.brooklyn.util.collections.MutableMap;
 import org.apache.brooklyn.util.collections.MutableSet;
-import org.apache.brooklyn.util.guava.Maybe;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -91,6 +90,7 @@ public class WorkflowStepInstanceExecutionContext {
     Set<BrooklynTaskTags.WorkflowTaskTag> subWorkflows = MutableSet.of();
 
     Object output;
+    Object output2222;
     @JsonInclude(JsonInclude.Include.NON_EMPTY)
     public Map<String,Object> otherMetadata = MutableMap.of();
 
@@ -166,6 +166,7 @@ public class WorkflowStepInstanceExecutionContext {
     }
     public void setOutput(Object output) {
         this.output = output;
+        this.output2222 = output;
     }
 
     @JsonIgnore

Reply via email to