This is an automated email from the ASF dual-hosted git repository.

heneveld pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git

commit 8e00a08c378dfe909f855d279624682d64164736
Author: Alex Heneveld <[email protected]>
AuthorDate: Tue May 23 22:24:07 2023 +0100

    tidy replay/resume semantics, fix rare bug in subworkflow resumption
---
 .../core/workflow/WorkflowCommonConfig.java        |   2 +
 .../core/workflow/WorkflowExecutionContext.java    |   6 +-
 .../core/workflow/WorkflowReplayUtils.java         |  61 ++++++---
 .../core/workflow/WorkflowStepDefinition.java      | 150 ++++++++++++++-------
 .../core/workflow/steps/CustomWorkflowStep.java    |   4 +-
 .../steps/appmodel/InvokeEffectorWorkflowStep.java |  31 ++++-
 .../steps/appmodel/UpdateChildrenWorkflowStep.java |  31 +++--
 .../workflow/steps/flow/SwitchWorkflowStep.java    |   4 +-
 .../WorkflowNestedAndCustomExtensionTest.java      |   7 +-
 9 files changed, 211 insertions(+), 85 deletions(-)

diff --git 
a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowCommonConfig.java
 
b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowCommonConfig.java
index def2f79940..142c7deddd 100644
--- 
a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowCommonConfig.java
+++ 
b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowCommonConfig.java
@@ -46,9 +46,11 @@ public interface WorkflowCommonConfig {
     ConfigKey<String> RETENTION = ConfigKeys.newStringConfigKey("retention",
             "Specification for how long workflow should be retained");
 
+    // see docs settings.md - eg 'from start' on workflow, or 'from here 
[only]' on step
     ConfigKey<String> REPLAYABLE = ConfigKeys.newStringConfigKey("replayable",
             "Indication of from what points the workflow is replayable");
 
+    // see docs settings.md - 'all' if set on workflow (use with case), or 
yes/no/default on a step; consider also 'replayable'
     ConfigKey<String> IDEMPOTENT = ConfigKeys.newStringConfigKey("idempotent",
             "Indication of which steps in the workflow are idempotent");
 
diff --git 
a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java
 
b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java
index a3552bab06..0319263f87 100644
--- 
a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java
+++ 
b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java
@@ -78,6 +78,8 @@ import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Supplier;
 
+import static 
org.apache.brooklyn.core.workflow.WorkflowReplayUtils.ReplayResumeDepthCheck.RESUMABLE_WHENEVER_NESTED_WORKFLOWS_PRESENT;
+
 @JsonInclude(JsonInclude.Include.NON_NULL)
 public class WorkflowExecutionContext {
 
@@ -451,7 +453,8 @@ public class WorkflowExecutionContext {
             }
 
             if (!forced && replayFromStep == null) {
-                if 
(!WorkflowReplayUtils.isReplayResumable(WorkflowExecutionContext.this, false, 
allowInternallyEvenIfDisabled)) {
+                // instructions should be made even if subworkflows might 
reject them; that's the intention of "resume" without force, vs replay from last
+                if 
(!WorkflowReplayUtils.isReplayResumable(WorkflowExecutionContext.this, 
RESUMABLE_WHENEVER_NESTED_WORKFLOWS_PRESENT, allowInternallyEvenIfDisabled)) {
                     if (code != null) {
                         // we could allow this, but we don't need it
                         throw new IllegalArgumentException("Cannot supply code 
to here without forcing as workflow does not support replay resuming at this 
point");
@@ -933,6 +936,7 @@ public class WorkflowExecutionContext {
                         if (!replaying) 
initializeWithoutContinuationInstructions(replayFromStep);
 
                         continueOnErrorHandledOrNextReplay = false;
+                        lastErrorHandlerOutput = null;
 
                         
WorkflowReplayUtils.updateOnWorkflowTaskStartupOrReplay(WorkflowExecutionContext.this,
 task, getStepsResolved(), !replaying, replayFromStep);
 
diff --git 
a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowReplayUtils.java 
b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowReplayUtils.java
index 35c42600d1..5f48b8a72a 100644
--- 
a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowReplayUtils.java
+++ 
b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowReplayUtils.java
@@ -25,7 +25,6 @@ import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
 import 
org.apache.brooklyn.core.workflow.store.WorkflowStatePersistenceViaSensors;
 import org.apache.brooklyn.util.collections.MutableSet;
 import org.apache.brooklyn.util.core.config.ConfigBag;
-import org.apache.brooklyn.util.core.flags.TypeCoercions;
 import org.apache.brooklyn.util.core.task.DynamicTasks;
 import org.apache.brooklyn.util.core.task.Tasks;
 import org.apache.brooklyn.util.exceptions.Exceptions;
@@ -49,7 +48,12 @@ public class WorkflowReplayUtils {
     private static final Logger log = 
LoggerFactory.getLogger(WorkflowReplayUtils.class);
 
 
-    public static boolean isReplayResumable(WorkflowExecutionContext 
workflowExecutionContext, boolean requireDeeplyReplayable, boolean 
allowInternallyEvenIfDisabled) {
+    public enum ReplayResumeDepthCheck {
+        RESUMABLE_IFF_NESTED_WORKFLOWS_RESUMABLE,
+        RESUMABLE_IFF_NESTED_WORKFLOWS_REPLAYABLE_OR_RESUMABLE,
+        RESUMABLE_WHENEVER_NESTED_WORKFLOWS_PRESENT,
+    }
+    public static boolean isReplayResumable(WorkflowExecutionContext 
workflowExecutionContext, ReplayResumeDepthCheck requireDeeplyReplayable, 
boolean allowInternallyEvenIfDisabled) {
         WorkflowStepInstanceExecutionContext csi = 
workflowExecutionContext.currentStepInstance;
         if (csi!=null) {
             if (csi.getStepIndex()!=workflowExecutionContext.currentStepIndex) 
{
@@ -64,17 +68,28 @@ public class WorkflowReplayUtils {
                 // 'no' is (currently) not overridable by auto-detection, but 
`yes` is.
                 if (Boolean.FALSE.equals(idempotence)) return false;
 
-                // if 'yes' or null, we check.
+                // if 'yes' or null, we check subworkflows
                 if (stepDefinition instanceof 
WorkflowStepDefinition.WorkflowStepDefinitionWithSubWorkflow) {
-                    List<WorkflowExecutionContext> subWorkflows = 
((WorkflowStepDefinition.WorkflowStepDefinitionWithSubWorkflow) 
stepDefinition).getSubWorkflowsForReplay(csi, false, true, 
allowInternallyEvenIfDisabled);
-                    if (subWorkflows == null) return false;
-                    if (!requireDeeplyReplayable) return true;
-                    return subWorkflows.stream().allMatch(sub -> 
isReplayResumable(sub, requireDeeplyReplayable, allowInternallyEvenIfDisabled));
+                    WorkflowStepDefinition.SubWorkflowsForReplay 
subWorkflowReplayable = 
((WorkflowStepDefinition.WorkflowStepDefinitionWithSubWorkflow) 
stepDefinition).getSubWorkflowsForReplay(csi, false, true, 
allowInternallyEvenIfDisabled);
+
+                    if (!subWorkflowReplayable.isResumableAtSubworkflows) {
+                        if (subWorkflowReplayable.hasNonResumableWorkflows && 
requireDeeplyReplayable==ReplayResumeDepthCheck.RESUMABLE_IFF_NESTED_WORKFLOWS_RESUMABLE)
 return false;
+                        return subWorkflowReplayable.isResumableOnlyAtParent;
+                    }
+
+                    // non-null subworkflows, so need to inspect subworkflows
+                    if 
(requireDeeplyReplayable==ReplayResumeDepthCheck.RESUMABLE_WHENEVER_NESTED_WORKFLOWS_PRESENT)
 return true;
+                    if 
(requireDeeplyReplayable==ReplayResumeDepthCheck.RESUMABLE_IFF_NESTED_WORKFLOWS_REPLAYABLE_OR_RESUMABLE)
+                        return 
subWorkflowReplayable.subworkflows.stream().allMatch(sub -> 
isReplayableAnywhere(sub, allowInternallyEvenIfDisabled));
+                    if 
(requireDeeplyReplayable==ReplayResumeDepthCheck.RESUMABLE_IFF_NESTED_WORKFLOWS_RESUMABLE)
+                        return 
subWorkflowReplayable.subworkflows.stream().allMatch(sub -> 
isReplayResumable(sub, requireDeeplyReplayable, allowInternallyEvenIfDisabled));
+                    // shouldn't come here
+                    throw new IllegalArgumentException("Invalid requireDeeply 
mode: "+requireDeeplyReplayable);
                 }
 
                 if (idempotence!=null) return idempotence;
 
-                // shouldn't come here
+                // comes here if a workflow step without subworkflows declares 
null default idempotence, which should not normally happen
                 return false;
 
             } else {
@@ -115,7 +130,7 @@ public class WorkflowReplayUtils {
                 || (workflowExecutionContext.currentStepInstance!=null && 
workflowExecutionContext.currentStepInstance.getStepIndex()!=workflowExecutionContext.currentStepIndex)
                 || 
workflowExecutionContext.currentStepIndex==WorkflowExecutionContext.STEP_INDEX_FOR_START
                 || 
workflowExecutionContext.currentStepIndex==WorkflowExecutionContext.STEP_INDEX_FOR_END
-                || isReplayResumable(workflowExecutionContext, true, 
allowInternallyEvenIfDisabled);
+                || isReplayResumable(workflowExecutionContext, 
ReplayResumeDepthCheck.RESUMABLE_IFF_NESTED_WORKFLOWS_REPLAYABLE_OR_RESUMABLE, 
allowInternallyEvenIfDisabled);
     }
 
     /** throws error if any argument non-blank invalid; null if nothing to do; 
otherwise a consumer which will initialize the WEC */
@@ -128,7 +143,7 @@ public class WorkflowReplayUtils {
         boolean idempotentAll = "all".equals(idempotent);
         if (idempotentAll) idempotent = "";
 
-        if (!Strings.isBlank(idempotent)) throw new 
IllegalArgumentException("Invalid value for `idemopotent` on workflow step");
+        if (!Strings.isBlank(idempotent)) throw new 
IllegalArgumentException("Invalid value for `idempotent` on workflow step");
 
         // replayable:
         //
@@ -431,7 +446,7 @@ public class WorkflowReplayUtils {
             }
         } catch (Exception e) {
             Exceptions.propagateIfFatal(e);
-            log.debug("Step " + context.getWorkflowStepReference() + " could 
not resume nested workflow " + w.getWorkflowId() + " (running anew): "+e);
+            log.debug("Step " + context.getWorkflowStepReference() + " could 
not resume nested workflow " + (w==null ? "<null>" : w.getWorkflowId()) + " 
(running alternate): "+e);
 
             return Pair.of(false, ifNotReplayable.apply(w, e));
         }
@@ -453,11 +468,14 @@ public class WorkflowReplayUtils {
     }
 
     public static boolean 
addNewSubWorkflow(WorkflowStepInstanceExecutionContext context, 
BrooklynTaskTags.WorkflowTaskTag nw) {
+        if (nw==null) throw new IllegalArgumentException("Workflow tag must 
not be null");
         return context.getSubWorkflows().add(nw);
     }
 
-    public static List<WorkflowExecutionContext> 
getSubWorkflowsForReplay(WorkflowStepInstanceExecutionContext context, boolean 
forced, boolean peekingOnly, boolean allowInternallyEvenIfDisabled) {
+    public static WorkflowStepDefinition.SubWorkflowsForReplay 
getSubWorkflowsForReplay(WorkflowStepInstanceExecutionContext context, boolean 
forced, boolean peekingOnly, boolean allowInternallyEvenIfDisabled,
+                                                                               
         Consumer<WorkflowStepDefinition.SubWorkflowsForReplay> 
ifNoSubworkflows) {
         Set<BrooklynTaskTags.WorkflowTaskTag> sws = context.getSubWorkflows();
+        WorkflowStepDefinition.SubWorkflowsForReplay result = new 
WorkflowStepDefinition.SubWorkflowsForReplay();
         if (sws!=null && !sws.isEmpty()) {
             // replaying
 
@@ -471,25 +489,32 @@ public class WorkflowReplayUtils {
                         return new 
WorkflowStatePersistenceViaSensors(context.getManagementContext()).getWorkflows(targetEntity).get(tag.getWorkflowId());
                     }).collect(Collectors.toList());
 
+            result.subworkflows = nestedWorkflowsToReplay;
+
             if (nestedWorkflowsToReplay.isEmpty()) {
                 if (!peekingOnly) log.info("Step 
"+context.getWorkflowStepReference()+" has all sub workflows superseded; 
replaying from start");
                 if (!peekingOnly) log.debug("Step 
"+context.getWorkflowStepReference()+" superseded sub workflows detail: "+sws+" 
-> "+nestedWorkflowsToReplay);
-                // fall through to below
+                ifNoSubworkflows.accept(result);
+
             } else if (nestedWorkflowsToReplay.contains(null)) {
                 if (!peekingOnly) log.info("Step 
"+context.getWorkflowStepReference()+" has uninitialized sub workflows; 
replaying from start");
                 if (!peekingOnly) log.debug("Step 
"+context.getWorkflowStepReference()+" uninitialized/unpersisted sub workflow 
detail: "+sws+" -> "+nestedWorkflowsToReplay);
-                // fall through to below
+                ifNoSubworkflows.accept(result);
+
             } else if (!forced && 
nestedWorkflowsToReplay.stream().anyMatch(nest -> !isReplayableAnywhere(nest, 
allowInternallyEvenIfDisabled))) {
                 if (!peekingOnly) log.info("Step 
"+context.getWorkflowStepReference()+" has non-replayable sub workflows; 
replaying from start");
                 if (!peekingOnly) log.debug("Step 
"+context.getWorkflowStepReference()+" non-replayable sub workflow detail: 
"+sws+" -> "+nestedWorkflowsToReplay);
-                // fall through to below
-            } else {
+                result.hasNonResumableWorkflows = true;
+                // parent not resumable
 
+            } else {
                 if (!peekingOnly) log.debug("Step 
"+context.getWorkflowStepReference()+" replay sub workflow detail: "+sws+" -> 
"+nestedWorkflowsToReplay);
-                return nestedWorkflowsToReplay;
+                result.isResumableAtSubworkflows = true;
             }
+        } else {
+            ifNoSubworkflows.accept(result);
         }
-        return null;
+        return result;
     }
 
     public static Object getNext(Object ...sources) {
diff --git 
a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepDefinition.java
 
b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepDefinition.java
index 0a72c575be..e7ac95b261 100644
--- 
a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepDefinition.java
+++ 
b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepDefinition.java
@@ -54,27 +54,32 @@ public abstract class WorkflowStepDefinition {
     private static final Logger log = 
LoggerFactory.getLogger(WorkflowStepDefinition.class);
 
     protected String id;
+
     public String getId() {
         return id;
     }
 
     //    name:  a name to display in the UI; if omitted it is constructed 
from the step ID and step type
     protected String name;
+
     public String getName() {
         return name;
     }
+
     public void setName(String name) {
         this.name = name;
     }
 
-    /** freeform data for use by tools and clients */
+    /**
+     * freeform data for use by tools and clients
+     */
     protected Object metadata;
 
     protected String userSuppliedShorthand;
     protected String shorthandTypeName;
 
     @JsonInclude(JsonInclude.Include.NON_EMPTY)
-    protected Map<String,Object> input = MutableMap.of();
+    protected Map<String, Object> input = MutableMap.of();
 
     //    next:  the next step to go to, assuming the step runs and succeeds; 
if omitted, or if the condition does not apply, it goes to the next step per 
the ordering (described below)
     @JsonProperty("next")  //use this field for access, not the getter/setter
@@ -82,31 +87,38 @@ public abstract class WorkflowStepDefinition {
 
     //    condition:  a condition to require for the step to run; if false, 
the step is skipped
     protected Object condition;
+
     @JsonIgnore
     public Object getConditionRaw() {
         return condition;
     }
+
     @JsonIgnore
     public DslPredicates.DslPredicate 
getConditionResolved(WorkflowStepInstanceExecutionContext context) {
         try {
             return 
context.resolveWrapped(WorkflowExpressionResolution.WorkflowExpressionStage.STEP_RUNNING,
 getConditionRaw(), TypeToken.of(DslPredicates.DslPredicate.class));
         } catch (Exception e) {
-            throw Exceptions.propagateAnnotated("Unresolveable condition 
("+getConditionRaw()+")", e);
+            throw Exceptions.propagateAnnotated("Unresolveable condition (" + 
getConditionRaw() + ")", e);
         }
     }
 
     // output of steps can be overridden
     protected Object output;
-    protected boolean isOutputHandledByTask() { return false; }
+
+    protected boolean isOutputHandledByTask() {
+        return false;
+    }
 
     protected String replayable;
     protected String idempotent;
+
     protected Pair<WorkflowReplayUtils.ReplayableAtStepOption, Boolean> 
validateReplayableAndIdempotent() {
         return 
WorkflowReplayUtils.validateReplayableAndIdempotentAtStep(replayable, 
idempotent, false);
     }
 
     @JsonProperty("timeout")
     protected Duration timeout;
+
     @JsonIgnore
     public Duration getTimeout() {
         return timeout;
@@ -115,6 +127,7 @@ public abstract class WorkflowStepDefinition {
     // TODO: might be nice to support a shorthand for on-error; but not yet
     @JsonProperty("on-error")
     protected Object onError = MutableList.of();
+
     @JsonInclude(JsonInclude.Include.NON_EMPTY)
     public Object getOnError() {
         return onError;
@@ -133,20 +146,26 @@ public abstract class WorkflowStepDefinition {
         this.input.putAll(input);
     }
 
-    /** Returns the unresolved map of inputs */
+    /**
+     * Returns the unresolved map of inputs
+     */
     public Map<String, Object> getInput() {
         return input;
     }
 
-    /** note, this should _not_ have the type string first, whereas in YAML 
the shorthand must have the type string first */
+    /**
+     * note, this should _not_ have the type string first, whereas in YAML the 
shorthand must have the type string first
+     */
     abstract public void populateFromShorthand(String value);
 
     protected void populateFromShorthandTemplate(String template, String 
value) {
         populateFromShorthandTemplate(template, value, false, true);
     }
+
     protected Maybe<Map<String, Object>> populateFromShorthandTemplate(String 
template, String value, boolean finalMatchRaw, boolean failOnMismatch) {
         Maybe<Map<String, Object>> result = new 
ShorthandProcessor(template).withFinalMatchRaw(finalMatchRaw).withFailOnMismatch(failOnMismatch).process(value);
-        if (result.isAbsent()) throw new IllegalArgumentException("Invalid 
shorthand expression: '"+value+"'", Maybe.Absent.getException(result));
+        if (result.isAbsent())
+            throw new IllegalArgumentException("Invalid shorthand expression: 
'" + value + "'", Maybe.Absent.getException(result));
 
         input.putAll((Map<? extends String, ?>) 
CollectionMerger.builder().build().merge(input, result.get()));
         return result;
@@ -166,43 +185,51 @@ public abstract class WorkflowStepDefinition {
 
     protected Task<?> newTask(WorkflowStepInstanceExecutionContext context, 
ReplayContinuationInstructions continuationInstructions,
                               String specialName, 
BrooklynTaskTags.WorkflowTaskTag tagOverride) {
-        Task<?> t = Tasks.builder().displayName(specialName!=null ? 
specialName : computeName(context, true))
+        Task<?> t = Tasks.builder().displayName(specialName != null ? 
specialName : computeName(context, true))
                 .tag(tagOverride != null ? tagOverride : 
BrooklynTaskTags.tagForWorkflow(context))
                 .tag(BrooklynTaskTags.WORKFLOW_TAG)
                 .tag(TaskTags.INESSENTIAL_TASK)   // need this so parent's 
queue doesn't abort if this fails, parent is able to run error handling
                 .body(() -> {
-            log.debug("Starting " +
-                    (specialName!=null ? specialName : "step 
"+context.getWorkflowExectionContext().getWorkflowStepReference(context.stepIndex,
 this))
-                    + (Strings.isNonBlank(name) ? " '"+name+"'" : "")
-                    + (continuationInstructions!=null ? " (continuation"
-                            + 
(continuationInstructions.customBehaviorExplanation !=null ? " - 
"+continuationInstructions.customBehaviorExplanation : "")
+                    log.debug("Starting " +
+                            (specialName != null ? specialName : "step " + 
context.getWorkflowExectionContext().getWorkflowStepReference(context.stepIndex,
 this))
+                            + (Strings.isNonBlank(name) ? " '" + name + "'" : 
"")
+                            + (continuationInstructions != null ? " 
(continuation"
+                            + 
(continuationInstructions.customBehaviorExplanation != null ? " - " + 
continuationInstructions.customBehaviorExplanation : "")
                             + ")"
-                        : "")
-                    + " in task "+Tasks.current().getId());
-            Callable<Object> handler = null;
-
-            if (continuationInstructions!=null && this instanceof 
WorkflowStepDefinitionWithSubWorkflow) {
-                List<WorkflowExecutionContext> unfinished = 
((WorkflowStepDefinitionWithSubWorkflow) 
this).getSubWorkflowsForReplay(context, continuationInstructions.forced, false, 
true);
-                if (unfinished!=null) {
-                    handler = () ->
-                            ((WorkflowStepDefinitionWithSubWorkflow) 
this).doTaskBodyWithSubWorkflowsForReplay(context, unfinished, 
continuationInstructions);
-                } else {
-                    // fall through to below; the sub-workflows were not 
persisted so shouldn't have been started
-                }
-            }
-            if (handler==null) {
-                handler = () -> {
-                    if (continuationInstructions != null && 
continuationInstructions.customBehavior != null) {
-                        continuationInstructions.customBehavior.run();
+                            : "")
+                            + " in task " + Tasks.current().getId());
+                    Callable<Object> handler = null;
+
+                    if (continuationInstructions != null && this instanceof 
WorkflowStepDefinitionWithSubWorkflow) {
+                        // what is the difference between this block and
+                        // WorkflowReplayUtils.replayResumingInSubWorkflow(...)
+
+                        SubWorkflowsForReplay unfinished = 
((WorkflowStepDefinitionWithSubWorkflow) 
this).getSubWorkflowsForReplay(context, continuationInstructions.forced, false, 
true);
+                        if (unfinished.isResumableAtSubworkflows) {
+                            handler = () ->
+                                    ((WorkflowStepDefinitionWithSubWorkflow) 
this).doTaskBodyWithSubWorkflowsForReplay(context, unfinished.subworkflows, 
continuationInstructions);
+                        } else if (!unfinished.isResumableOnlyAtParent) {
+                            if (!continuationInstructions.forced) throw new 
IllegalStateException("Cannot continue, due to non-idempotent workflows and not 
forced");
+                            // fall through to below because forced
+                        } else {
+                            // fall through to below; no sub-workflows
+                        }
                     }
-                    return doTaskBody(context);
-                };
-            };
-
-            Object result = handler.call();
-            if (log.isTraceEnabled()) log.trace("Completed task for 
"+computeName(context, true)+", output "+result);
-            return result;
-        }).build();
+                    if (handler == null) {
+                        handler = () -> {
+                            if (continuationInstructions != null && 
continuationInstructions.customBehavior != null) {
+                                continuationInstructions.customBehavior.run();
+                            }
+                            // if continuing, all the info needed is now in 
the step state on the context and should be used by the method below
+                            return doTaskBody(context);
+                        };
+                    };
+
+                    Object result = handler.call();
+                    if (log.isTraceEnabled())
+                        log.trace("Completed task for " + computeName(context, 
true) + ", output " + result);
+                    return result;
+                }).build();
         context.taskId = t.getId();
         return t;
     }
@@ -233,9 +260,9 @@ public abstract class WorkflowStepDefinition {
 
         if (Strings.isNonBlank(context.name)) {
             // if there is a name, add that also, removing id if name starts 
with id
-            String last = parts.isEmpty() ? null : parts.get(parts.size()-1);
-            if (last!=null && context.name.startsWith(last)) {
-                parts.remove(parts.size()-1);
+            String last = parts.isEmpty() ? null : parts.get(parts.size() - 1);
+            if (last != null && context.name.startsWith(last)) {
+                parts.remove(parts.size() - 1);
             }
             parts.add(context.name);
         } else if (!hasId) {
@@ -251,7 +278,10 @@ public abstract class WorkflowStepDefinition {
         return Strings.join(parts, " - ");
     }
 
-    public void setShorthandTypeName(String shorthandTypeDefinition) { 
this.shorthandTypeName = shorthandTypeDefinition; }
+    public void setShorthandTypeName(String shorthandTypeDefinition) {
+        this.shorthandTypeName = shorthandTypeDefinition;
+    }
+
     @JsonProperty("shorthandTypeName")  // REST API should prefer this accessor
     public String getShorthandTypeName() {
         if (Strings.isNonBlank(shorthandTypeName)) return shorthandTypeName;
@@ -262,7 +292,9 @@ public abstract class WorkflowStepDefinition {
         return name;
     }
 
-    /** allows subclasses to throw exception early if required fields not set 
*/
+    /**
+     * allows subclasses to throw exception early if required fields not set
+     */
     public void validateStep(@Nullable ManagementContext mgmt, @Nullable 
WorkflowExecutionContext workflow) {
         validateReplayableAndIdempotent();
     }
@@ -272,28 +304,50 @@ public abstract class WorkflowStepDefinition {
         return context.getStepState();
     }
 
+    /**
+     * whether a step is idempotent, meaning it can be re-run if interrupted 
there;
+     * false is a hard no; true or null mean that for {@link 
WorkflowStepDefinitionWithSubWorkflow}, it depends on subworkflows if they have 
been computed and stored;
+     * and for other (non-subworkflow) steps, null means not sure, defaulting 
to false.
+     * <p>
+     * computed based on replayable and idempotent settings, then falling back 
to workflow-level setting 'idempotent: all' then step default idempotence
+     */
     Boolean isIdempotent(WorkflowStepInstanceExecutionContext csi) {
         Boolean idempotence = validateReplayableAndIdempotent().getRight();
 
-        if (idempotence==null) {
-            if (csi!=null && csi.getWorkflowExectionContext()!=null) 
idempotence = csi.getWorkflowExectionContext().idempotentAll;
+        if (idempotence == null) {
+            if (csi != null && csi.getWorkflowExectionContext() != null)
+                idempotence = csi.getWorkflowExectionContext().idempotentAll;
         }
-        if (idempotence==null) {
+        if (idempotence == null) {
             idempotence = isDefaultIdempotent();
         }
 
         return idempotence;
     }
 
+    /**
+     * default value for whether this step type is idempotent; can be 
overridden if idempotent is set on the step or workflow;
+     * see {@link #isIdempotent(WorkflowStepInstanceExecutionContext)};
+     * for {@link WorkflowStepDefinitionWithSubWorkflow}, null is preferred 
and is equivalent to true (in both cases meaning to check subworkflow state),
+     * otherwise null is discouraged and is equivalent to false
+     */
     protected abstract Boolean isDefaultIdempotent();
 
     public interface WorkflowStepDefinitionWithSpecialDeserialization {
         WorkflowStepDefinition applySpecialDefinition(ManagementContext mgmt, 
Object definition, String typeBestGuess, 
WorkflowStepDefinitionWithSpecialDeserialization firstParse);
     }
 
+    public static class SubWorkflowsForReplay {
+        public String notes;
+        public boolean hasNonResumableWorkflows;
+        public boolean isResumableOnlyAtParent;
+        public boolean isResumableAtSubworkflows;
+        /** subworkflows, or null if not available. in either case, this does 
not imply whether it is valid to resume the step. */
+        public List<WorkflowExecutionContext> subworkflows;
+    }
+
     public interface WorkflowStepDefinitionWithSubWorkflow {
-        /** returns null if this task hasn't yet recorded its subworkflows; 
otherwise list of those which are replayable, empty if none need to be replayed 
(ended successfully) */
-        @JsonIgnore List<WorkflowExecutionContext> 
getSubWorkflowsForReplay(WorkflowStepInstanceExecutionContext context, boolean 
forced, boolean peekingOnly, boolean allowInternallyEvenIfDisabled);
+        @JsonIgnore SubWorkflowsForReplay 
getSubWorkflowsForReplay(WorkflowStepInstanceExecutionContext context, boolean 
forced, boolean peekingOnly, boolean allowInternallyEvenIfDisabled);
         /** called by framework if {@link 
#getSubWorkflowsForReplay(WorkflowStepInstanceExecutionContext, boolean, 
boolean, boolean)} returns non-null (empty is okay),
          * and the implementation pass the replay and optional custom 
behaviour to the subworkflows before doing any finalization;
          * if the subworkflow for replay is null,  the normal {@link 
#doTaskBody(WorkflowStepInstanceExecutionContext)} is called. */
diff --git 
a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/CustomWorkflowStep.java
 
b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/CustomWorkflowStep.java
index 84f4f795c8..d2580ea915 100644
--- 
a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/CustomWorkflowStep.java
+++ 
b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/CustomWorkflowStep.java
@@ -191,8 +191,8 @@ public class CustomWorkflowStep extends 
WorkflowStepDefinition implements Workfl
     }
 
     @Override @JsonIgnore
-    public List<WorkflowExecutionContext> 
getSubWorkflowsForReplay(WorkflowStepInstanceExecutionContext context, boolean 
forced, boolean peekingOnly, boolean allowInternallyEvenIfDisabled) {
-        return WorkflowReplayUtils.getSubWorkflowsForReplay(context, forced, 
peekingOnly, allowInternallyEvenIfDisabled);
+    public SubWorkflowsForReplay 
getSubWorkflowsForReplay(WorkflowStepInstanceExecutionContext context, boolean 
forced, boolean peekingOnly, boolean allowInternallyEvenIfDisabled) {
+        return WorkflowReplayUtils.getSubWorkflowsForReplay(context, forced, 
peekingOnly, allowInternallyEvenIfDisabled, sw -> sw.isResumableOnlyAtParent = 
true);
     }
 
     @Override
diff --git 
a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/appmodel/InvokeEffectorWorkflowStep.java
 
b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/appmodel/InvokeEffectorWorkflowStep.java
index 6d6bdb99f2..3cb940e42f 100644
--- 
a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/appmodel/InvokeEffectorWorkflowStep.java
+++ 
b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/appmodel/InvokeEffectorWorkflowStep.java
@@ -36,6 +36,7 @@ import 
org.apache.brooklyn.core.workflow.WorkflowExecutionContext;
 import org.apache.brooklyn.core.workflow.WorkflowReplayUtils;
 import org.apache.brooklyn.core.workflow.WorkflowStepDefinition;
 import org.apache.brooklyn.core.workflow.WorkflowStepInstanceExecutionContext;
+import org.apache.brooklyn.core.workflow.steps.flow.SwitchWorkflowStep;
 import org.apache.brooklyn.util.collections.MutableList;
 import org.apache.brooklyn.util.core.task.DynamicTasks;
 import org.apache.brooklyn.util.exceptions.Exceptions;
@@ -70,8 +71,14 @@ public class InvokeEffectorWorkflowStep extends 
WorkflowStepDefinition implement
     }
 
     @Override @JsonIgnore
-    public List<WorkflowExecutionContext> 
getSubWorkflowsForReplay(WorkflowStepInstanceExecutionContext context, boolean 
forced, boolean peekingOnly, boolean allowInternallyEvenIfDisabled) {
-        return WorkflowReplayUtils.getSubWorkflowsForReplay(context, forced, 
peekingOnly, allowInternallyEvenIfDisabled);
+    public SubWorkflowsForReplay 
getSubWorkflowsForReplay(WorkflowStepInstanceExecutionContext context, boolean 
forced, boolean peekingOnly, boolean allowInternallyEvenIfDisabled) {
+        return WorkflowReplayUtils.getSubWorkflowsForReplay(context, forced, 
peekingOnly, allowInternallyEvenIfDisabled, sw -> {
+            // comes here if subworkflows were abnormal
+            StepState state = getStepState(context);
+            if (!state.submitted) sw.isResumableOnlyAtParent = true;  // no 
subworkflows, fine to resume
+            else if (state.nonWorkflowEffector) sw.isResumableOnlyAtParent = 
false;  // subworkflows definitely not resumable, can't resume
+            else sw.isResumableOnlyAtParent = true; // odd case, subworkflows 
perhaps cancelled and superseded but then not submitted or something odd like 
that
+        });
     }
 
     @Override
@@ -83,6 +90,21 @@ public class InvokeEffectorWorkflowStep extends 
WorkflowStepDefinition implement
                 }, true);
     }
 
+    static class StepState {
+        boolean submitted;
+        boolean nonWorkflowEffector;
+    }
+
+    @Override
+    protected StepState getStepState(WorkflowStepInstanceExecutionContext 
context) {
+        StepState result = (StepState) super.getStepState(context);
+        if (result==null) result = new StepState();
+        return result;
+    }
+    void setStepState(WorkflowStepInstanceExecutionContext context, StepState 
state) {
+        context.setStepState(state, true);
+    }
+
     @Override
     protected Object doTaskBody(WorkflowStepInstanceExecutionContext context) {
         Object te = context.getInput(ENTITY);
@@ -111,6 +133,11 @@ public class InvokeEffectorWorkflowStep extends 
WorkflowStepDefinition implement
             // unlike nested case, no need to persist as single child workflow 
will persist themselves imminently, and if not no great shakes to recompute
         });
 
+        StepState state = getStepState(context);
+        state.nonWorkflowEffector = context.getSubWorkflows().isEmpty();
+        state.submitted = true;
+        setStepState(context, state);
+
         return DynamicTasks.queue(invocation).asTask().getUnchecked();
     }
 
diff --git 
a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/appmodel/UpdateChildrenWorkflowStep.java
 
b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/appmodel/UpdateChildrenWorkflowStep.java
index 73815a6622..a62ef63f66 100644
--- 
a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/appmodel/UpdateChildrenWorkflowStep.java
+++ 
b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/appmodel/UpdateChildrenWorkflowStep.java
@@ -47,7 +47,6 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.function.BiFunction;
@@ -150,17 +149,27 @@ public class UpdateChildrenWorkflowStep extends 
WorkflowStepDefinition implement
     }
 
     @Override
-    public List<WorkflowExecutionContext> 
getSubWorkflowsForReplay(WorkflowStepInstanceExecutionContext context, boolean 
forced, boolean peekingOnly, boolean allowInternallyEvenIfDisabled) {
-        //return WorkflowReplayUtils.getSubWorkflowsForReplay(context, forced, 
peekingOnly, allowInternallyEvenIfDisabled);
-
+    public SubWorkflowsForReplay 
getSubWorkflowsForReplay(WorkflowStepInstanceExecutionContext context, boolean 
forced, boolean peekingOnly, boolean allowInternallyEvenIfDisabled) {
+        WorkflowExecutionContext resultW = getSubWorkflowForReplay(context, 
forced, peekingOnly, allowInternallyEvenIfDisabled);
+        SubWorkflowsForReplay result;
+        if (resultW==null) {
+            result = new SubWorkflowsForReplay();
+            result.isResumableOnlyAtParent = true;  // always resumable betwen 
subworkflows
+        } else {
+            result = WorkflowReplayUtils.getSubWorkflowsForReplay(context, 
forced, peekingOnly, allowInternallyEvenIfDisabled, sw -> 
sw.isResumableOnlyAtParent = true /* always resumable if subworkflows not 
interrupted */ );
+            result.subworkflows = MutableList.of(resultW);
+        }
+        return result;
+    }
+    protected WorkflowExecutionContext 
getSubWorkflowForReplay(WorkflowStepInstanceExecutionContext context, boolean 
forced, boolean peekingOnly, boolean allowInternallyEvenIfDisabled) {
         UpdateChildrenStepState stepState = getStepState(context);
-        if (stepState.matchCheck!=null && 
stepState.matchCheck.workflowTag!=null) return 
MutableList.of(retrieveSubWorkflow(context, 
stepState.matchCheck.workflowTag.getWorkflowId()));
-        if (stepState.creationCheck!=null && 
stepState.creationCheck.workflowTag!=null) return 
MutableList.of(retrieveSubWorkflow(context, 
stepState.creationCheck.workflowTag.getWorkflowId()));
-        if (stepState.onCreate !=null && stepState.onCreate.workflowTag!=null) 
return MutableList.of(retrieveSubWorkflow(context, 
stepState.onCreate.workflowTag.getWorkflowId()));
-        if (stepState.onUpdate!=null && stepState.onUpdate.workflowTag!=null) 
return MutableList.of(retrieveSubWorkflow(context, 
stepState.onUpdate.workflowTag.getWorkflowId()));
-        if (stepState.deletionCheck!=null && 
stepState.deletionCheck.workflowTag!=null) return 
MutableList.of(retrieveSubWorkflow(context, 
stepState.deletionCheck.workflowTag.getWorkflowId()));
-        if (stepState.onDelete!=null && stepState.onDelete.workflowTag!=null) 
return MutableList.of(retrieveSubWorkflow(context, 
stepState.onDelete.workflowTag.getWorkflowId()));
-        return Collections.emptyList();
+        if (stepState.matchCheck!=null && 
stepState.matchCheck.workflowTag!=null) return retrieveSubWorkflow(context, 
stepState.matchCheck.workflowTag.getWorkflowId());
+        if (stepState.creationCheck!=null && 
stepState.creationCheck.workflowTag!=null) return retrieveSubWorkflow(context, 
stepState.creationCheck.workflowTag.getWorkflowId());
+        if (stepState.onCreate !=null && stepState.onCreate.workflowTag!=null) 
return retrieveSubWorkflow(context, 
stepState.onCreate.workflowTag.getWorkflowId());
+        if (stepState.onUpdate!=null && stepState.onUpdate.workflowTag!=null) 
return retrieveSubWorkflow(context, 
stepState.onUpdate.workflowTag.getWorkflowId());
+        if (stepState.deletionCheck!=null && 
stepState.deletionCheck.workflowTag!=null) return retrieveSubWorkflow(context, 
stepState.deletionCheck.workflowTag.getWorkflowId());
+        if (stepState.onDelete!=null && stepState.onDelete.workflowTag!=null) 
return retrieveSubWorkflow(context, 
stepState.onDelete.workflowTag.getWorkflowId());
+        return null;
     }
 
     private WorkflowExecutionContext 
retrieveSubWorkflow(WorkflowStepInstanceExecutionContext context, String 
workflowId) {
diff --git 
a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/flow/SwitchWorkflowStep.java
 
b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/flow/SwitchWorkflowStep.java
index 524c186ec8..107bff89f6 100644
--- 
a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/flow/SwitchWorkflowStep.java
+++ 
b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/flow/SwitchWorkflowStep.java
@@ -83,7 +83,7 @@ public class SwitchWorkflowStep extends 
WorkflowStepDefinition implements Workfl
         StepState state = new StepState();
         state.selectedStepDefinition = selectedStepDefinition;
         state.selectedStepContext = selectedStepContext;
-        context.setStepState(context, persist);
+        context.setStepState(state, persist);
     }
     protected <T> Maybe<T> 
runOnStepStateIfHasSubWorkflows(WorkflowStepInstanceExecutionContext context, 
Function<WorkflowStepDefinitionWithSubWorkflow,T> fn) {
         StepState state = getStepState(context);
@@ -141,7 +141,7 @@ public class SwitchWorkflowStep extends 
WorkflowStepDefinition implements Workfl
     }
 
     @Override
-    public List<WorkflowExecutionContext> 
getSubWorkflowsForReplay(WorkflowStepInstanceExecutionContext context, boolean 
forced, boolean peekingOnly, boolean allowInternallyEvenIfDisabled) {
+    public SubWorkflowsForReplay 
getSubWorkflowsForReplay(WorkflowStepInstanceExecutionContext context, boolean 
forced, boolean peekingOnly, boolean allowInternallyEvenIfDisabled) {
         return runOnStepStateIfHasSubWorkflows(context, s -> 
s.getSubWorkflowsForReplay(context, forced, peekingOnly, 
allowInternallyEvenIfDisabled)).get();
     }
 
diff --git 
a/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowNestedAndCustomExtensionTest.java
 
b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowNestedAndCustomExtensionTest.java
index b5389fcb30..517195896d 100644
--- 
a/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowNestedAndCustomExtensionTest.java
+++ 
b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowNestedAndCustomExtensionTest.java
@@ -743,7 +743,12 @@ public class WorkflowNestedAndCustomExtensionTest extends 
RebindTestFixture<Test
                     if (!t.isDone()) {
                         Asserts.fail("Workflow task should have finished: " + 
t.getStatusDetail(true));
                     }
-                    if (!t.isError() || expectRightAnswer) 
result.add((Integer) t.getUnchecked());
+                    if (!t.isError() || expectRightAnswer) {
+                        if (!(t.getUnchecked() instanceof Integer)) {
+                            log.warn("ERROR - task "+t+" did not return 
integer; returned: "+t.getUnchecked());
+                        }
+                        result.add((Integer) t.getUnchecked());
+                    }
                 }
             }
 


Reply via email to