This is an automated email from the ASF dual-hosted git repository. heneveld pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git
commit 8e00a08c378dfe909f855d279624682d64164736 Author: Alex Heneveld <[email protected]> AuthorDate: Tue May 23 22:24:07 2023 +0100 tidy replay/resume semantics, fix rare bug in subworkflow resumption --- .../core/workflow/WorkflowCommonConfig.java | 2 + .../core/workflow/WorkflowExecutionContext.java | 6 +- .../core/workflow/WorkflowReplayUtils.java | 61 ++++++--- .../core/workflow/WorkflowStepDefinition.java | 150 ++++++++++++++------- .../core/workflow/steps/CustomWorkflowStep.java | 4 +- .../steps/appmodel/InvokeEffectorWorkflowStep.java | 31 ++++- .../steps/appmodel/UpdateChildrenWorkflowStep.java | 31 +++-- .../workflow/steps/flow/SwitchWorkflowStep.java | 4 +- .../WorkflowNestedAndCustomExtensionTest.java | 7 +- 9 files changed, 211 insertions(+), 85 deletions(-) diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowCommonConfig.java b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowCommonConfig.java index def2f79940..142c7deddd 100644 --- a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowCommonConfig.java +++ b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowCommonConfig.java @@ -46,9 +46,11 @@ public interface WorkflowCommonConfig { ConfigKey<String> RETENTION = ConfigKeys.newStringConfigKey("retention", "Specification for how long workflow should be retained"); + // see docs settings.md - eg 'from start' on workflow, or 'from here [only]' on step ConfigKey<String> REPLAYABLE = ConfigKeys.newStringConfigKey("replayable", "Indication of from what points the workflow is replayable"); + // see docs settings.md - 'all' if set on workflow (use with case), or yes/no/default on a step; consider also 'replayable' ConfigKey<String> IDEMPOTENT = ConfigKeys.newStringConfigKey("idempotent", "Indication of which steps in the workflow are idempotent"); diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java index a3552bab06..0319263f87 100644 --- a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java +++ b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java @@ -78,6 +78,8 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; +import static org.apache.brooklyn.core.workflow.WorkflowReplayUtils.ReplayResumeDepthCheck.RESUMABLE_WHENEVER_NESTED_WORKFLOWS_PRESENT; + @JsonInclude(JsonInclude.Include.NON_NULL) public class WorkflowExecutionContext { @@ -451,7 +453,8 @@ public class WorkflowExecutionContext { } if (!forced && replayFromStep == null) { - if (!WorkflowReplayUtils.isReplayResumable(WorkflowExecutionContext.this, false, allowInternallyEvenIfDisabled)) { + // instructions should be made even if subworkflows might reject them; that's the intention of "resume" without force, vs replay from last + if (!WorkflowReplayUtils.isReplayResumable(WorkflowExecutionContext.this, RESUMABLE_WHENEVER_NESTED_WORKFLOWS_PRESENT, allowInternallyEvenIfDisabled)) { if (code != null) { // we could allow this, but we don't need it throw new IllegalArgumentException("Cannot supply code to here without forcing as workflow does not support replay resuming at this point"); @@ -933,6 +936,7 @@ public class WorkflowExecutionContext { if (!replaying) initializeWithoutContinuationInstructions(replayFromStep); continueOnErrorHandledOrNextReplay = false; + lastErrorHandlerOutput = null; WorkflowReplayUtils.updateOnWorkflowTaskStartupOrReplay(WorkflowExecutionContext.this, task, getStepsResolved(), !replaying, replayFromStep); diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowReplayUtils.java b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowReplayUtils.java index 35c42600d1..5f48b8a72a 100644 --- a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowReplayUtils.java +++ b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowReplayUtils.java @@ -25,7 +25,6 @@ import org.apache.brooklyn.core.mgmt.BrooklynTaskTags; import org.apache.brooklyn.core.workflow.store.WorkflowStatePersistenceViaSensors; import org.apache.brooklyn.util.collections.MutableSet; import org.apache.brooklyn.util.core.config.ConfigBag; -import org.apache.brooklyn.util.core.flags.TypeCoercions; import org.apache.brooklyn.util.core.task.DynamicTasks; import org.apache.brooklyn.util.core.task.Tasks; import org.apache.brooklyn.util.exceptions.Exceptions; @@ -49,7 +48,12 @@ public class WorkflowReplayUtils { private static final Logger log = LoggerFactory.getLogger(WorkflowReplayUtils.class); - public static boolean isReplayResumable(WorkflowExecutionContext workflowExecutionContext, boolean requireDeeplyReplayable, boolean allowInternallyEvenIfDisabled) { + public enum ReplayResumeDepthCheck { + RESUMABLE_IFF_NESTED_WORKFLOWS_RESUMABLE, + RESUMABLE_IFF_NESTED_WORKFLOWS_REPLAYABLE_OR_RESUMABLE, + RESUMABLE_WHENEVER_NESTED_WORKFLOWS_PRESENT, + } + public static boolean isReplayResumable(WorkflowExecutionContext workflowExecutionContext, ReplayResumeDepthCheck requireDeeplyReplayable, boolean allowInternallyEvenIfDisabled) { WorkflowStepInstanceExecutionContext csi = workflowExecutionContext.currentStepInstance; if (csi!=null) { if (csi.getStepIndex()!=workflowExecutionContext.currentStepIndex) { @@ -64,17 +68,28 @@ public class WorkflowReplayUtils { // 'no' is (currently) not overridable by auto-detection, but `yes` is. if (Boolean.FALSE.equals(idempotence)) return false; - // if 'yes' or null, we check. + // if 'yes' or null, we check subworkflows if (stepDefinition instanceof WorkflowStepDefinition.WorkflowStepDefinitionWithSubWorkflow) { - List<WorkflowExecutionContext> subWorkflows = ((WorkflowStepDefinition.WorkflowStepDefinitionWithSubWorkflow) stepDefinition).getSubWorkflowsForReplay(csi, false, true, allowInternallyEvenIfDisabled); - if (subWorkflows == null) return false; - if (!requireDeeplyReplayable) return true; - return subWorkflows.stream().allMatch(sub -> isReplayResumable(sub, requireDeeplyReplayable, allowInternallyEvenIfDisabled)); + WorkflowStepDefinition.SubWorkflowsForReplay subWorkflowReplayable = ((WorkflowStepDefinition.WorkflowStepDefinitionWithSubWorkflow) stepDefinition).getSubWorkflowsForReplay(csi, false, true, allowInternallyEvenIfDisabled); + + if (!subWorkflowReplayable.isResumableAtSubworkflows) { + if (subWorkflowReplayable.hasNonResumableWorkflows && requireDeeplyReplayable==ReplayResumeDepthCheck.RESUMABLE_IFF_NESTED_WORKFLOWS_RESUMABLE) return false; + return subWorkflowReplayable.isResumableOnlyAtParent; + } + + // non-null subworkflows, so need to inspect subworkflows + if (requireDeeplyReplayable==ReplayResumeDepthCheck.RESUMABLE_WHENEVER_NESTED_WORKFLOWS_PRESENT) return true; + if (requireDeeplyReplayable==ReplayResumeDepthCheck.RESUMABLE_IFF_NESTED_WORKFLOWS_REPLAYABLE_OR_RESUMABLE) + return subWorkflowReplayable.subworkflows.stream().allMatch(sub -> isReplayableAnywhere(sub, allowInternallyEvenIfDisabled)); + if (requireDeeplyReplayable==ReplayResumeDepthCheck.RESUMABLE_IFF_NESTED_WORKFLOWS_RESUMABLE) + return subWorkflowReplayable.subworkflows.stream().allMatch(sub -> isReplayResumable(sub, requireDeeplyReplayable, allowInternallyEvenIfDisabled)); + // shouldn't come here + throw new IllegalArgumentException("Invalid requireDeeply mode: "+requireDeeplyReplayable); } if (idempotence!=null) return idempotence; - // shouldn't come here + // comes here if a workflow step without subworkflows declares null default idempotence, which should not normally happen return false; } else { @@ -115,7 +130,7 @@ public class WorkflowReplayUtils { || (workflowExecutionContext.currentStepInstance!=null && workflowExecutionContext.currentStepInstance.getStepIndex()!=workflowExecutionContext.currentStepIndex) || workflowExecutionContext.currentStepIndex==WorkflowExecutionContext.STEP_INDEX_FOR_START || workflowExecutionContext.currentStepIndex==WorkflowExecutionContext.STEP_INDEX_FOR_END - || isReplayResumable(workflowExecutionContext, true, allowInternallyEvenIfDisabled); + || isReplayResumable(workflowExecutionContext, ReplayResumeDepthCheck.RESUMABLE_IFF_NESTED_WORKFLOWS_REPLAYABLE_OR_RESUMABLE, allowInternallyEvenIfDisabled); } /** throws error if any argument non-blank invalid; null if nothing to do; otherwise a consumer which will initialize the WEC */ @@ -128,7 +143,7 @@ public class WorkflowReplayUtils { boolean idempotentAll = "all".equals(idempotent); if (idempotentAll) idempotent = ""; - if (!Strings.isBlank(idempotent)) throw new IllegalArgumentException("Invalid value for `idemopotent` on workflow step"); + if (!Strings.isBlank(idempotent)) throw new IllegalArgumentException("Invalid value for `idempotent` on workflow step"); // replayable: // @@ -431,7 +446,7 @@ public class WorkflowReplayUtils { } } catch (Exception e) { Exceptions.propagateIfFatal(e); - log.debug("Step " + context.getWorkflowStepReference() + " could not resume nested workflow " + w.getWorkflowId() + " (running anew): "+e); + log.debug("Step " + context.getWorkflowStepReference() + " could not resume nested workflow " + (w==null ? "<null>" : w.getWorkflowId()) + " (running alternate): "+e); return Pair.of(false, ifNotReplayable.apply(w, e)); } @@ -453,11 +468,14 @@ public class WorkflowReplayUtils { } public static boolean addNewSubWorkflow(WorkflowStepInstanceExecutionContext context, BrooklynTaskTags.WorkflowTaskTag nw) { + if (nw==null) throw new IllegalArgumentException("Workflow tag must not be null"); return context.getSubWorkflows().add(nw); } - public static List<WorkflowExecutionContext> getSubWorkflowsForReplay(WorkflowStepInstanceExecutionContext context, boolean forced, boolean peekingOnly, boolean allowInternallyEvenIfDisabled) { + public static WorkflowStepDefinition.SubWorkflowsForReplay getSubWorkflowsForReplay(WorkflowStepInstanceExecutionContext context, boolean forced, boolean peekingOnly, boolean allowInternallyEvenIfDisabled, + Consumer<WorkflowStepDefinition.SubWorkflowsForReplay> ifNoSubworkflows) { Set<BrooklynTaskTags.WorkflowTaskTag> sws = context.getSubWorkflows(); + WorkflowStepDefinition.SubWorkflowsForReplay result = new WorkflowStepDefinition.SubWorkflowsForReplay(); if (sws!=null && !sws.isEmpty()) { // replaying @@ -471,25 +489,32 @@ public class WorkflowReplayUtils { return new WorkflowStatePersistenceViaSensors(context.getManagementContext()).getWorkflows(targetEntity).get(tag.getWorkflowId()); }).collect(Collectors.toList()); + result.subworkflows = nestedWorkflowsToReplay; + if (nestedWorkflowsToReplay.isEmpty()) { if (!peekingOnly) log.info("Step "+context.getWorkflowStepReference()+" has all sub workflows superseded; replaying from start"); if (!peekingOnly) log.debug("Step "+context.getWorkflowStepReference()+" superseded sub workflows detail: "+sws+" -> "+nestedWorkflowsToReplay); - // fall through to below + ifNoSubworkflows.accept(result); + } else if (nestedWorkflowsToReplay.contains(null)) { if (!peekingOnly) log.info("Step "+context.getWorkflowStepReference()+" has uninitialized sub workflows; replaying from start"); if (!peekingOnly) log.debug("Step "+context.getWorkflowStepReference()+" uninitialized/unpersisted sub workflow detail: "+sws+" -> "+nestedWorkflowsToReplay); - // fall through to below + ifNoSubworkflows.accept(result); + } else if (!forced && nestedWorkflowsToReplay.stream().anyMatch(nest -> !isReplayableAnywhere(nest, allowInternallyEvenIfDisabled))) { if (!peekingOnly) log.info("Step "+context.getWorkflowStepReference()+" has non-replayable sub workflows; replaying from start"); if (!peekingOnly) log.debug("Step "+context.getWorkflowStepReference()+" non-replayable sub workflow detail: "+sws+" -> "+nestedWorkflowsToReplay); - // fall through to below - } else { + result.hasNonResumableWorkflows = true; + // parent not resumable + } else { if (!peekingOnly) log.debug("Step "+context.getWorkflowStepReference()+" replay sub workflow detail: "+sws+" -> "+nestedWorkflowsToReplay); - return nestedWorkflowsToReplay; + result.isResumableAtSubworkflows = true; } + } else { + ifNoSubworkflows.accept(result); } - return null; + return result; } public static Object getNext(Object ...sources) { diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepDefinition.java b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepDefinition.java index 0a72c575be..e7ac95b261 100644 --- a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepDefinition.java +++ b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepDefinition.java @@ -54,27 +54,32 @@ public abstract class WorkflowStepDefinition { private static final Logger log = LoggerFactory.getLogger(WorkflowStepDefinition.class); protected String id; + public String getId() { return id; } // name: a name to display in the UI; if omitted it is constructed from the step ID and step type protected String name; + public String getName() { return name; } + public void setName(String name) { this.name = name; } - /** freeform data for use by tools and clients */ + /** + * freeform data for use by tools and clients + */ protected Object metadata; protected String userSuppliedShorthand; protected String shorthandTypeName; @JsonInclude(JsonInclude.Include.NON_EMPTY) - protected Map<String,Object> input = MutableMap.of(); + protected Map<String, Object> input = MutableMap.of(); // next: the next step to go to, assuming the step runs and succeeds; if omitted, or if the condition does not apply, it goes to the next step per the ordering (described below) @JsonProperty("next") //use this field for access, not the getter/setter @@ -82,31 +87,38 @@ public abstract class WorkflowStepDefinition { // condition: a condition to require for the step to run; if false, the step is skipped protected Object condition; + @JsonIgnore public Object getConditionRaw() { return condition; } + @JsonIgnore public DslPredicates.DslPredicate getConditionResolved(WorkflowStepInstanceExecutionContext context) { try { return context.resolveWrapped(WorkflowExpressionResolution.WorkflowExpressionStage.STEP_RUNNING, getConditionRaw(), TypeToken.of(DslPredicates.DslPredicate.class)); } catch (Exception e) { - throw Exceptions.propagateAnnotated("Unresolveable condition ("+getConditionRaw()+")", e); + throw Exceptions.propagateAnnotated("Unresolveable condition (" + getConditionRaw() + ")", e); } } // output of steps can be overridden protected Object output; - protected boolean isOutputHandledByTask() { return false; } + + protected boolean isOutputHandledByTask() { + return false; + } protected String replayable; protected String idempotent; + protected Pair<WorkflowReplayUtils.ReplayableAtStepOption, Boolean> validateReplayableAndIdempotent() { return WorkflowReplayUtils.validateReplayableAndIdempotentAtStep(replayable, idempotent, false); } @JsonProperty("timeout") protected Duration timeout; + @JsonIgnore public Duration getTimeout() { return timeout; @@ -115,6 +127,7 @@ public abstract class WorkflowStepDefinition { // TODO: might be nice to support a shorthand for on-error; but not yet @JsonProperty("on-error") protected Object onError = MutableList.of(); + @JsonInclude(JsonInclude.Include.NON_EMPTY) public Object getOnError() { return onError; @@ -133,20 +146,26 @@ public abstract class WorkflowStepDefinition { this.input.putAll(input); } - /** Returns the unresolved map of inputs */ + /** + * Returns the unresolved map of inputs + */ public Map<String, Object> getInput() { return input; } - /** note, this should _not_ have the type string first, whereas in YAML the shorthand must have the type string first */ + /** + * note, this should _not_ have the type string first, whereas in YAML the shorthand must have the type string first + */ abstract public void populateFromShorthand(String value); protected void populateFromShorthandTemplate(String template, String value) { populateFromShorthandTemplate(template, value, false, true); } + protected Maybe<Map<String, Object>> populateFromShorthandTemplate(String template, String value, boolean finalMatchRaw, boolean failOnMismatch) { Maybe<Map<String, Object>> result = new ShorthandProcessor(template).withFinalMatchRaw(finalMatchRaw).withFailOnMismatch(failOnMismatch).process(value); - if (result.isAbsent()) throw new IllegalArgumentException("Invalid shorthand expression: '"+value+"'", Maybe.Absent.getException(result)); + if (result.isAbsent()) + throw new IllegalArgumentException("Invalid shorthand expression: '" + value + "'", Maybe.Absent.getException(result)); input.putAll((Map<? extends String, ?>) CollectionMerger.builder().build().merge(input, result.get())); return result; @@ -166,43 +185,51 @@ public abstract class WorkflowStepDefinition { protected Task<?> newTask(WorkflowStepInstanceExecutionContext context, ReplayContinuationInstructions continuationInstructions, String specialName, BrooklynTaskTags.WorkflowTaskTag tagOverride) { - Task<?> t = Tasks.builder().displayName(specialName!=null ? specialName : computeName(context, true)) + Task<?> t = Tasks.builder().displayName(specialName != null ? specialName : computeName(context, true)) .tag(tagOverride != null ? tagOverride : BrooklynTaskTags.tagForWorkflow(context)) .tag(BrooklynTaskTags.WORKFLOW_TAG) .tag(TaskTags.INESSENTIAL_TASK) // need this so parent's queue doesn't abort if this fails, parent is able to run error handling .body(() -> { - log.debug("Starting " + - (specialName!=null ? specialName : "step "+context.getWorkflowExectionContext().getWorkflowStepReference(context.stepIndex, this)) - + (Strings.isNonBlank(name) ? " '"+name+"'" : "") - + (continuationInstructions!=null ? " (continuation" - + (continuationInstructions.customBehaviorExplanation !=null ? " - "+continuationInstructions.customBehaviorExplanation : "") + log.debug("Starting " + + (specialName != null ? specialName : "step " + context.getWorkflowExectionContext().getWorkflowStepReference(context.stepIndex, this)) + + (Strings.isNonBlank(name) ? " '" + name + "'" : "") + + (continuationInstructions != null ? " (continuation" + + (continuationInstructions.customBehaviorExplanation != null ? " - " + continuationInstructions.customBehaviorExplanation : "") + ")" - : "") - + " in task "+Tasks.current().getId()); - Callable<Object> handler = null; - - if (continuationInstructions!=null && this instanceof WorkflowStepDefinitionWithSubWorkflow) { - List<WorkflowExecutionContext> unfinished = ((WorkflowStepDefinitionWithSubWorkflow) this).getSubWorkflowsForReplay(context, continuationInstructions.forced, false, true); - if (unfinished!=null) { - handler = () -> - ((WorkflowStepDefinitionWithSubWorkflow) this).doTaskBodyWithSubWorkflowsForReplay(context, unfinished, continuationInstructions); - } else { - // fall through to below; the sub-workflows were not persisted so shouldn't have been started - } - } - if (handler==null) { - handler = () -> { - if (continuationInstructions != null && continuationInstructions.customBehavior != null) { - continuationInstructions.customBehavior.run(); + : "") + + " in task " + Tasks.current().getId()); + Callable<Object> handler = null; + + if (continuationInstructions != null && this instanceof WorkflowStepDefinitionWithSubWorkflow) { + // what is the difference between this block and + // WorkflowReplayUtils.replayResumingInSubWorkflow(...) + + SubWorkflowsForReplay unfinished = ((WorkflowStepDefinitionWithSubWorkflow) this).getSubWorkflowsForReplay(context, continuationInstructions.forced, false, true); + if (unfinished.isResumableAtSubworkflows) { + handler = () -> + ((WorkflowStepDefinitionWithSubWorkflow) this).doTaskBodyWithSubWorkflowsForReplay(context, unfinished.subworkflows, continuationInstructions); + } else if (!unfinished.isResumableOnlyAtParent) { + if (!continuationInstructions.forced) throw new IllegalStateException("Cannot continue, due to non-idempotent workflows and not forced"); + // fall through to below because forced + } else { + // fall through to below; no sub-workflows + } } - return doTaskBody(context); - }; - }; - - Object result = handler.call(); - if (log.isTraceEnabled()) log.trace("Completed task for "+computeName(context, true)+", output "+result); - return result; - }).build(); + if (handler == null) { + handler = () -> { + if (continuationInstructions != null && continuationInstructions.customBehavior != null) { + continuationInstructions.customBehavior.run(); + } + // if continuing, all the info needed is now in the step state on the context and should be used by the method below + return doTaskBody(context); + }; + }; + + Object result = handler.call(); + if (log.isTraceEnabled()) + log.trace("Completed task for " + computeName(context, true) + ", output " + result); + return result; + }).build(); context.taskId = t.getId(); return t; } @@ -233,9 +260,9 @@ public abstract class WorkflowStepDefinition { if (Strings.isNonBlank(context.name)) { // if there is a name, add that also, removing id if name starts with id - String last = parts.isEmpty() ? null : parts.get(parts.size()-1); - if (last!=null && context.name.startsWith(last)) { - parts.remove(parts.size()-1); + String last = parts.isEmpty() ? null : parts.get(parts.size() - 1); + if (last != null && context.name.startsWith(last)) { + parts.remove(parts.size() - 1); } parts.add(context.name); } else if (!hasId) { @@ -251,7 +278,10 @@ public abstract class WorkflowStepDefinition { return Strings.join(parts, " - "); } - public void setShorthandTypeName(String shorthandTypeDefinition) { this.shorthandTypeName = shorthandTypeDefinition; } + public void setShorthandTypeName(String shorthandTypeDefinition) { + this.shorthandTypeName = shorthandTypeDefinition; + } + @JsonProperty("shorthandTypeName") // REST API should prefer this accessor public String getShorthandTypeName() { if (Strings.isNonBlank(shorthandTypeName)) return shorthandTypeName; @@ -262,7 +292,9 @@ public abstract class WorkflowStepDefinition { return name; } - /** allows subclasses to throw exception early if required fields not set */ + /** + * allows subclasses to throw exception early if required fields not set + */ public void validateStep(@Nullable ManagementContext mgmt, @Nullable WorkflowExecutionContext workflow) { validateReplayableAndIdempotent(); } @@ -272,28 +304,50 @@ public abstract class WorkflowStepDefinition { return context.getStepState(); } + /** + * whether a step is idempotent, meaning it can be re-run if interrupted there; + * false is a hard no; true or null mean that for {@link WorkflowStepDefinitionWithSubWorkflow}, it depends on subworkflows if they have been computed and stored; + * and for other (non-subworkflow) steps, null means not sure, defaulting to false. + * <p> + * computed based on replayable and idempotent settings, then falling back to workflow-level setting 'idempotent: all' then step default idempotence + */ Boolean isIdempotent(WorkflowStepInstanceExecutionContext csi) { Boolean idempotence = validateReplayableAndIdempotent().getRight(); - if (idempotence==null) { - if (csi!=null && csi.getWorkflowExectionContext()!=null) idempotence = csi.getWorkflowExectionContext().idempotentAll; + if (idempotence == null) { + if (csi != null && csi.getWorkflowExectionContext() != null) + idempotence = csi.getWorkflowExectionContext().idempotentAll; } - if (idempotence==null) { + if (idempotence == null) { idempotence = isDefaultIdempotent(); } return idempotence; } + /** + * default value for whether this step type is idempotent; can be overridden if idempotent is set on the step or workflow; + * see {@link #isIdempotent(WorkflowStepInstanceExecutionContext)}; + * for {@link WorkflowStepDefinitionWithSubWorkflow}, null is preferred and is equivalent to true (in both cases meaning to check subworkflow state), + * otherwise null is discouraged and is equivalent to false + */ protected abstract Boolean isDefaultIdempotent(); public interface WorkflowStepDefinitionWithSpecialDeserialization { WorkflowStepDefinition applySpecialDefinition(ManagementContext mgmt, Object definition, String typeBestGuess, WorkflowStepDefinitionWithSpecialDeserialization firstParse); } + public static class SubWorkflowsForReplay { + public String notes; + public boolean hasNonResumableWorkflows; + public boolean isResumableOnlyAtParent; + public boolean isResumableAtSubworkflows; + /** subworkflows, or null if not available. in either case, this does not imply whether it is valid to resume the step. */ + public List<WorkflowExecutionContext> subworkflows; + } + public interface WorkflowStepDefinitionWithSubWorkflow { - /** returns null if this task hasn't yet recorded its subworkflows; otherwise list of those which are replayable, empty if none need to be replayed (ended successfully) */ - @JsonIgnore List<WorkflowExecutionContext> getSubWorkflowsForReplay(WorkflowStepInstanceExecutionContext context, boolean forced, boolean peekingOnly, boolean allowInternallyEvenIfDisabled); + @JsonIgnore SubWorkflowsForReplay getSubWorkflowsForReplay(WorkflowStepInstanceExecutionContext context, boolean forced, boolean peekingOnly, boolean allowInternallyEvenIfDisabled); /** called by framework if {@link #getSubWorkflowsForReplay(WorkflowStepInstanceExecutionContext, boolean, boolean, boolean)} returns non-null (empty is okay), * and the implementation pass the replay and optional custom behaviour to the subworkflows before doing any finalization; * if the subworkflow for replay is null, the normal {@link #doTaskBody(WorkflowStepInstanceExecutionContext)} is called. */ diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/CustomWorkflowStep.java b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/CustomWorkflowStep.java index 84f4f795c8..d2580ea915 100644 --- a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/CustomWorkflowStep.java +++ b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/CustomWorkflowStep.java @@ -191,8 +191,8 @@ public class CustomWorkflowStep extends WorkflowStepDefinition implements Workfl } @Override @JsonIgnore - public List<WorkflowExecutionContext> getSubWorkflowsForReplay(WorkflowStepInstanceExecutionContext context, boolean forced, boolean peekingOnly, boolean allowInternallyEvenIfDisabled) { - return WorkflowReplayUtils.getSubWorkflowsForReplay(context, forced, peekingOnly, allowInternallyEvenIfDisabled); + public SubWorkflowsForReplay getSubWorkflowsForReplay(WorkflowStepInstanceExecutionContext context, boolean forced, boolean peekingOnly, boolean allowInternallyEvenIfDisabled) { + return WorkflowReplayUtils.getSubWorkflowsForReplay(context, forced, peekingOnly, allowInternallyEvenIfDisabled, sw -> sw.isResumableOnlyAtParent = true); } @Override diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/appmodel/InvokeEffectorWorkflowStep.java b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/appmodel/InvokeEffectorWorkflowStep.java index 6d6bdb99f2..3cb940e42f 100644 --- a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/appmodel/InvokeEffectorWorkflowStep.java +++ b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/appmodel/InvokeEffectorWorkflowStep.java @@ -36,6 +36,7 @@ import org.apache.brooklyn.core.workflow.WorkflowExecutionContext; import org.apache.brooklyn.core.workflow.WorkflowReplayUtils; import org.apache.brooklyn.core.workflow.WorkflowStepDefinition; import org.apache.brooklyn.core.workflow.WorkflowStepInstanceExecutionContext; +import org.apache.brooklyn.core.workflow.steps.flow.SwitchWorkflowStep; import org.apache.brooklyn.util.collections.MutableList; import org.apache.brooklyn.util.core.task.DynamicTasks; import org.apache.brooklyn.util.exceptions.Exceptions; @@ -70,8 +71,14 @@ public class InvokeEffectorWorkflowStep extends WorkflowStepDefinition implement } @Override @JsonIgnore - public List<WorkflowExecutionContext> getSubWorkflowsForReplay(WorkflowStepInstanceExecutionContext context, boolean forced, boolean peekingOnly, boolean allowInternallyEvenIfDisabled) { - return WorkflowReplayUtils.getSubWorkflowsForReplay(context, forced, peekingOnly, allowInternallyEvenIfDisabled); + public SubWorkflowsForReplay getSubWorkflowsForReplay(WorkflowStepInstanceExecutionContext context, boolean forced, boolean peekingOnly, boolean allowInternallyEvenIfDisabled) { + return WorkflowReplayUtils.getSubWorkflowsForReplay(context, forced, peekingOnly, allowInternallyEvenIfDisabled, sw -> { + // comes here if subworkflows were abnormal + StepState state = getStepState(context); + if (!state.submitted) sw.isResumableOnlyAtParent = true; // no subworkflows, fine to resume + else if (state.nonWorkflowEffector) sw.isResumableOnlyAtParent = false; // subworkflows definitely not resumable, can't resume + else sw.isResumableOnlyAtParent = true; // odd case, subworkflows perhaps cancelled and superseded but then not submitted or something odd like that + }); } @Override @@ -83,6 +90,21 @@ public class InvokeEffectorWorkflowStep extends WorkflowStepDefinition implement }, true); } + static class StepState { + boolean submitted; + boolean nonWorkflowEffector; + } + + @Override + protected StepState getStepState(WorkflowStepInstanceExecutionContext context) { + StepState result = (StepState) super.getStepState(context); + if (result==null) result = new StepState(); + return result; + } + void setStepState(WorkflowStepInstanceExecutionContext context, StepState state) { + context.setStepState(state, true); + } + @Override protected Object doTaskBody(WorkflowStepInstanceExecutionContext context) { Object te = context.getInput(ENTITY); @@ -111,6 +133,11 @@ public class InvokeEffectorWorkflowStep extends WorkflowStepDefinition implement // unlike nested case, no need to persist as single child workflow will persist themselves imminently, and if not no great shakes to recompute }); + StepState state = getStepState(context); + state.nonWorkflowEffector = context.getSubWorkflows().isEmpty(); + state.submitted = true; + setStepState(context, state); + return DynamicTasks.queue(invocation).asTask().getUnchecked(); } diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/appmodel/UpdateChildrenWorkflowStep.java b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/appmodel/UpdateChildrenWorkflowStep.java index 73815a6622..a62ef63f66 100644 --- a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/appmodel/UpdateChildrenWorkflowStep.java +++ b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/appmodel/UpdateChildrenWorkflowStep.java @@ -47,7 +47,6 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nonnull; import javax.annotation.Nullable; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.function.BiFunction; @@ -150,17 +149,27 @@ public class UpdateChildrenWorkflowStep extends WorkflowStepDefinition implement } @Override - public List<WorkflowExecutionContext> getSubWorkflowsForReplay(WorkflowStepInstanceExecutionContext context, boolean forced, boolean peekingOnly, boolean allowInternallyEvenIfDisabled) { - //return WorkflowReplayUtils.getSubWorkflowsForReplay(context, forced, peekingOnly, allowInternallyEvenIfDisabled); - + public SubWorkflowsForReplay getSubWorkflowsForReplay(WorkflowStepInstanceExecutionContext context, boolean forced, boolean peekingOnly, boolean allowInternallyEvenIfDisabled) { + WorkflowExecutionContext resultW = getSubWorkflowForReplay(context, forced, peekingOnly, allowInternallyEvenIfDisabled); + SubWorkflowsForReplay result; + if (resultW==null) { + result = new SubWorkflowsForReplay(); + result.isResumableOnlyAtParent = true; // always resumable betwen subworkflows + } else { + result = WorkflowReplayUtils.getSubWorkflowsForReplay(context, forced, peekingOnly, allowInternallyEvenIfDisabled, sw -> sw.isResumableOnlyAtParent = true /* always resumable if subworkflows not interrupted */ ); + result.subworkflows = MutableList.of(resultW); + } + return result; + } + protected WorkflowExecutionContext getSubWorkflowForReplay(WorkflowStepInstanceExecutionContext context, boolean forced, boolean peekingOnly, boolean allowInternallyEvenIfDisabled) { UpdateChildrenStepState stepState = getStepState(context); - if (stepState.matchCheck!=null && stepState.matchCheck.workflowTag!=null) return MutableList.of(retrieveSubWorkflow(context, stepState.matchCheck.workflowTag.getWorkflowId())); - if (stepState.creationCheck!=null && stepState.creationCheck.workflowTag!=null) return MutableList.of(retrieveSubWorkflow(context, stepState.creationCheck.workflowTag.getWorkflowId())); - if (stepState.onCreate !=null && stepState.onCreate.workflowTag!=null) return MutableList.of(retrieveSubWorkflow(context, stepState.onCreate.workflowTag.getWorkflowId())); - if (stepState.onUpdate!=null && stepState.onUpdate.workflowTag!=null) return MutableList.of(retrieveSubWorkflow(context, stepState.onUpdate.workflowTag.getWorkflowId())); - if (stepState.deletionCheck!=null && stepState.deletionCheck.workflowTag!=null) return MutableList.of(retrieveSubWorkflow(context, stepState.deletionCheck.workflowTag.getWorkflowId())); - if (stepState.onDelete!=null && stepState.onDelete.workflowTag!=null) return MutableList.of(retrieveSubWorkflow(context, stepState.onDelete.workflowTag.getWorkflowId())); - return Collections.emptyList(); + if (stepState.matchCheck!=null && stepState.matchCheck.workflowTag!=null) return retrieveSubWorkflow(context, stepState.matchCheck.workflowTag.getWorkflowId()); + if (stepState.creationCheck!=null && stepState.creationCheck.workflowTag!=null) return retrieveSubWorkflow(context, stepState.creationCheck.workflowTag.getWorkflowId()); + if (stepState.onCreate !=null && stepState.onCreate.workflowTag!=null) return retrieveSubWorkflow(context, stepState.onCreate.workflowTag.getWorkflowId()); + if (stepState.onUpdate!=null && stepState.onUpdate.workflowTag!=null) return retrieveSubWorkflow(context, stepState.onUpdate.workflowTag.getWorkflowId()); + if (stepState.deletionCheck!=null && stepState.deletionCheck.workflowTag!=null) return retrieveSubWorkflow(context, stepState.deletionCheck.workflowTag.getWorkflowId()); + if (stepState.onDelete!=null && stepState.onDelete.workflowTag!=null) return retrieveSubWorkflow(context, stepState.onDelete.workflowTag.getWorkflowId()); + return null; } private WorkflowExecutionContext retrieveSubWorkflow(WorkflowStepInstanceExecutionContext context, String workflowId) { diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/flow/SwitchWorkflowStep.java b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/flow/SwitchWorkflowStep.java index 524c186ec8..107bff89f6 100644 --- a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/flow/SwitchWorkflowStep.java +++ b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/flow/SwitchWorkflowStep.java @@ -83,7 +83,7 @@ public class SwitchWorkflowStep extends WorkflowStepDefinition implements Workfl StepState state = new StepState(); state.selectedStepDefinition = selectedStepDefinition; state.selectedStepContext = selectedStepContext; - context.setStepState(context, persist); + context.setStepState(state, persist); } protected <T> Maybe<T> runOnStepStateIfHasSubWorkflows(WorkflowStepInstanceExecutionContext context, Function<WorkflowStepDefinitionWithSubWorkflow,T> fn) { StepState state = getStepState(context); @@ -141,7 +141,7 @@ public class SwitchWorkflowStep extends WorkflowStepDefinition implements Workfl } @Override - public List<WorkflowExecutionContext> getSubWorkflowsForReplay(WorkflowStepInstanceExecutionContext context, boolean forced, boolean peekingOnly, boolean allowInternallyEvenIfDisabled) { + public SubWorkflowsForReplay getSubWorkflowsForReplay(WorkflowStepInstanceExecutionContext context, boolean forced, boolean peekingOnly, boolean allowInternallyEvenIfDisabled) { return runOnStepStateIfHasSubWorkflows(context, s -> s.getSubWorkflowsForReplay(context, forced, peekingOnly, allowInternallyEvenIfDisabled)).get(); } diff --git a/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowNestedAndCustomExtensionTest.java b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowNestedAndCustomExtensionTest.java index b5389fcb30..517195896d 100644 --- a/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowNestedAndCustomExtensionTest.java +++ b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowNestedAndCustomExtensionTest.java @@ -743,7 +743,12 @@ public class WorkflowNestedAndCustomExtensionTest extends RebindTestFixture<Test if (!t.isDone()) { Asserts.fail("Workflow task should have finished: " + t.getStatusDetail(true)); } - if (!t.isError() || expectRightAnswer) result.add((Integer) t.getUnchecked()); + if (!t.isError() || expectRightAnswer) { + if (!(t.getUnchecked() instanceof Integer)) { + log.warn("ERROR - task "+t+" did not return integer; returned: "+t.getUnchecked()); + } + result.add((Integer) t.getUnchecked()); + } } }
