This is an automated email from the ASF dual-hosted git repository. heneveld pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git
commit 78bd08e701cf08a66219f137dde12e89f9323352 Author: Alex Heneveld <[email protected]> AuthorDate: Tue May 23 13:03:14 2023 +0100 update children now working, with all options --- .../core/workflow/steps/CustomWorkflowStep.java | 26 ++ .../steps/appmodel/UpdateChildrenWorkflowStep.java | 331 ++++++++++++--------- .../workflow/steps/flow/ForeachWorkflowStep.java | 29 ++ .../workflow/WorkflowUpdateChildrenStepTest.java | 116 +++++++- 4 files changed, 345 insertions(+), 157 deletions(-) diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/CustomWorkflowStep.java b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/CustomWorkflowStep.java index 8475a41056..84f4f795c8 100644 --- a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/CustomWorkflowStep.java +++ b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/CustomWorkflowStep.java @@ -82,6 +82,32 @@ public class CustomWorkflowStep extends WorkflowStepDefinition implements Workfl this.steps = steps; } + public CustomWorkflowStep(CustomWorkflowStep base) { + this.retention = base.retention; + this.target = base.target; + this.target_var_name = base.target_var_name; + this.target_index_var_name = base.target_index_var_name; + this.lock = base.lock; + this.concurrency = base.concurrency; + this.parameters = base.parameters; + this.steps = base.steps; + this.workflowOutput = base.workflowOutput; + this.reducing = base.reducing; + this.id = base.id; + this.name = base.name; + this.metadata = base.metadata; + this.userSuppliedShorthand = base.userSuppliedShorthand; + this.shorthandTypeName = base.shorthandTypeName; + this.input = base.input; + this.next = base.next; + this.condition = base.condition; + this.output = base.output; + this.replayable = base.replayable; + this.idempotent = base.idempotent; + this.timeout = base.timeout; + this.onError = base.onError; + } + @JsonCreator /** special creator for when a list is supplied as the workflow; treat those as steps, without requiring a superfluous `steps` entry in a map. * this is useful especially for config key / parameters of type `workflow` ({@link CustomWorkflowStep)}. */ diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/appmodel/UpdateChildrenWorkflowStep.java b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/appmodel/UpdateChildrenWorkflowStep.java index e9c69371e3..95fc497882 100644 --- a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/appmodel/UpdateChildrenWorkflowStep.java +++ b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/appmodel/UpdateChildrenWorkflowStep.java @@ -19,20 +19,21 @@ package org.apache.brooklyn.core.workflow.steps.appmodel; import com.google.common.collect.Iterables; +import com.google.common.reflect.TypeToken; import org.apache.brooklyn.api.entity.Entity; import org.apache.brooklyn.api.mgmt.ManagementContext; -import org.apache.brooklyn.config.ConfigInheritance; import org.apache.brooklyn.config.ConfigKey; import org.apache.brooklyn.core.config.ConfigKeys; import org.apache.brooklyn.core.entity.BrooklynConfigKeys; +import org.apache.brooklyn.core.entity.Entities; import org.apache.brooklyn.core.mgmt.BrooklynTaskTags; import org.apache.brooklyn.core.workflow.*; import org.apache.brooklyn.core.workflow.steps.CustomWorkflowStep; +import org.apache.brooklyn.core.workflow.steps.flow.ForeachWorkflowStep; import org.apache.brooklyn.core.workflow.steps.variables.SetVariableWorkflowStep; import org.apache.brooklyn.core.workflow.store.WorkflowStatePersistenceViaSensors; import org.apache.brooklyn.util.collections.MutableList; import org.apache.brooklyn.util.collections.MutableMap; -import org.apache.brooklyn.util.collections.MutableSet; import org.apache.brooklyn.util.core.config.ConfigBag; import org.apache.brooklyn.util.core.flags.TypeCoercions; import org.apache.brooklyn.util.core.task.DynamicTasks; @@ -49,7 +50,8 @@ import javax.annotation.Nullable; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Set; +import java.util.function.BiFunction; +import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -58,7 +60,7 @@ public class UpdateChildrenWorkflowStep extends WorkflowStepDefinition implement private static final Logger LOG = LoggerFactory.getLogger(UpdateChildrenWorkflowStep.class); - public static final String SHORTHAND = "[ \"of \" ${parent} \" \" ] \"type \" ${type} \" id \" ${identifier} [ \" from \" ${items} ]"; + public static final String SHORTHAND = "[ \" of \" ${parent} ] [ \" type \" ${type} ] [ \" id \" ${identifier_expression} ] [ \" from \" ${items} ]"; public static final ConfigKey<Object> PARENT = ConfigKeys.newConfigKey(Object.class, "parent", "the entity or entity ID whose children are to be updated, defaulting to the current entity; " + @@ -72,13 +74,12 @@ public class UpdateChildrenWorkflowStep extends WorkflowStepDefinition implement public static final ConfigKey<List> ITEMS = ConfigKeys.newConfigKey(List.class, "items", "the list of items to be used to create/update/delete the children"); - public static final ConfigKey<CustomWorkflowStep> MATCH_CHECK_WORKFLOW = ConfigKeys.builder(CustomWorkflowStep.class, "match_check") - .runtimeInheritance(ConfigInheritance.NONE) - .build(); - - public static final ConfigKey<CustomWorkflowStep> CREATION_CHECK_WORKFLOW = ConfigKeys.builder(CustomWorkflowStep.class, "creation_check") - .runtimeInheritance(ConfigInheritance.NONE) - .build(); + public static final ConfigKey<CustomWorkflowStep> MATCH_CHECK_WORKFLOW = ConfigKeys.builder(CustomWorkflowStep.class, "match_check").build(); + public static final ConfigKey<CustomWorkflowStep> CREATION_CHECK_WORKFLOW = ConfigKeys.builder(CustomWorkflowStep.class, "creation_check").build(); + public static final ConfigKey<CustomWorkflowStep> DELETION_CHECK_WORKFLOW = ConfigKeys.builder(CustomWorkflowStep.class, "deletion_check").build(); + public static final ConfigKey<CustomWorkflowStep> ON_CREATE_WORKFLOW = ConfigKeys.builder(CustomWorkflowStep.class, "on_create").build(); + public static final ConfigKey<CustomWorkflowStep> ON_UPDATE_WORKFLOW = ConfigKeys.builder(CustomWorkflowStep.class, "on_update").build(); + public static final ConfigKey<CustomWorkflowStep> ON_DELETE_WORKFLOW = ConfigKeys.builder(CustomWorkflowStep.class, "on_delete").build(); /* * `on_create`: an optionally supplied workflow to run at any newly created child, where no pre-existing child was found @@ -119,18 +120,19 @@ public class UpdateChildrenWorkflowStep extends WorkflowStepDefinition implement returned by the `item_check_workflow`, with each such entity passed as an input variable `child` (along with all inputs to the `update-children` step); it can then return `true` or `false` to specify whether the child should be deleted - (with `on_delete` called prior to deletion if `true` is returned); + (with `on_delete` called prior to any deletion); this workflow may reparent the entity and return `false` if it is intended to keep the entity but disconnected from this synchronization process, or may even `delete-entity ${child}` (although that is not usually necessary) - - - step: foreach item in ${items} - reducing: - result: {} - steps: - - match-check - x */ + + // see WorkflowCommonConfig.LOCK + // TODO obtain lock on parent -> "update-children" + protected Object lock; + + // usually a string; see utils/WorkflowConcurrency + protected Object concurrency; + @Override public Logger logger() { return LOG; @@ -174,8 +176,11 @@ public class UpdateChildrenWorkflowStep extends WorkflowStepDefinition implement List items; WorkflowTagWithResult<List> matchCheck = new WorkflowTagWithResult<>(); - WorkflowTagWithResult<List> creationCheck = new WorkflowTagWithResult<>(); + WorkflowTagWithResult<List<Map>> creationCheck = new WorkflowTagWithResult<>(); + WorkflowTagWithResult<Object> onCreation = new WorkflowTagWithResult<>(); + WorkflowTagWithResult<Object> onUpdate = new WorkflowTagWithResult<>(); WorkflowTagWithResult<List> deletionCheck = new WorkflowTagWithResult<>(); + WorkflowTagWithResult<Object> onDelete = new WorkflowTagWithResult<>(); } static class WorkflowTagWithResult<T> { @@ -183,6 +188,22 @@ public class UpdateChildrenWorkflowStep extends WorkflowStepDefinition implement T result; } + static class TransientEntityReference { + transient Entity entity; + String entityId; + public TransientEntityReference(Entity entity) { + this.entity = entity; + this.entityId = entity==null ? null : entity.getId(); + } + public Entity getEntity(ManagementContext mgmt) { + if (entityId==null) return null; + if (entity!=null && Entities.isManagedActiveOrComingUp(entity)) return entity; + entity = mgmt.lookup(entityId, Entity.class); + if (entity!=null && Entities.isManagedActiveOrComingUp(entity)) return entity; + return null; + } + } + @Override protected UpdateChildrenStepState getStepState(WorkflowStepInstanceExecutionContext context) { return (UpdateChildrenStepState) super.getStepState(context); @@ -215,157 +236,179 @@ public class UpdateChildrenWorkflowStep extends WorkflowStepDefinition implement stepState = stepStateO; } - List matches = runSubWorkflowForPhase(context, instructionsForResuming, subworkflowTargetForResuming, + Object blueprintNotYetInterpolated = resolveBlueprint(context, () -> { + String type = context.getInput(TYPE); + if (Strings.isBlank(type)) throw new IllegalStateException("blueprint or type must be supplied"); // should've been caught earlier but check again for good measure + return "type: " + StringEscapes.JavaStringEscapes.wrapJavaString(type); + }, SetVariableWorkflowStep.InterpolationMode.DISABLED, TemplateProcessor.InterpolationErrorMode.FAIL); + + BiFunction<CustomWorkflowStep, Consumer<ForeachWorkflowStep>,ConfigBag> outerWorkflowCustomers = (checkWorkflow, foreachCustom) -> { + ForeachWorkflowStep foreach = new ForeachWorkflowStep(checkWorkflow); + foreach.getInput().put("parent", stepState.parent); + foreach.getInput().put("identifier_expression", stepState.identifier_expression); + foreach.getInput().put("blueprint", blueprintNotYetInterpolated); + // TODO other vars? + if (foreach.getIdempotent() == null) foreach.setIdempotent(idempotent); + if (foreach.getConcurrency() == null) foreach.setConcurrency(concurrency); + foreachCustom.accept(foreach); + + return ConfigBag.newInstance() + .configure(WorkflowCommonConfig.STEPS, + MutableList.of(foreach)) + .configure(WorkflowCommonConfig.IDEMPOTENT, idempotent); + }; + + + List matches = runOrResumeSubWorkflowForPhaseOrReturnPreviousIfCompleted(context, instructionsForResuming, subworkflowTargetForResuming, "Matching items against children", stepState.matchCheck, MATCH_CHECK_WORKFLOW, () -> new CustomWorkflowStep(MutableList.of( - "let id = ${${identifier}}", - "let child_or_id = ${parent.child[${id}]} ?? ${id}", + "transform identifier_expression | resolve_expression | set id", + MutableMap.of("step", "fail message identifier_expression should be a non-static expression including an interpolated reference to item", + "condition", MutableMap.of("target", "${identifier_expression}", "equals", "${id}")), + "let child_or_id = ${parent.children[id]} ?? ${id}", + "transform child_tostring = ${child_or_id} | to_string", + "transform parent_tostring = ${parent} | to_string", "return ${child_or_id}")), - checkWorkflow -> ConfigBag.newInstance() - .configure(WorkflowCommonConfig.STEPS, - MutableList.of( - MutableMap.of("step", "foreach item", - "target", stepState.items, - "steps", MutableList.of(checkWorkflow), - "input", MutableMap.of( - "parent", stepState.parent, - "identifier_expression", stepState.identifier_expression - // TODO others? - ), - "idempotent", idempotent - // TODO concurrency? - ))) - .configure(WorkflowCommonConfig.IDEMPOTENT, idempotent) ); - - List<Map<String,Object>> stringMatches = MutableList.of(); + checkWorkflow -> outerWorkflowCustomers.apply(checkWorkflow, + foreach -> { + foreach.setTarget(stepState.items); + foreach.setTargetVarName("item"); + }), + list -> (List) list.stream().map(m -> m instanceof Entity ? new TransientEntityReference((Entity)m) : m).collect(Collectors.toList()) ); + + List<Map<String,Object>> stringMatchesToCreate = MutableList.of(); for (int i=0; i<matches.size(); i++) { Object m = matches.get(i); if (m instanceof String) { - stringMatches.add(MutableMap.of("match", m, "item", stepState.items.get(i))); - } - } - List<Map<String,Object>> entityMatches = MutableList.of(); - for (int i=0; i<matches.size(); i++) { - Object m = matches.get(i); - if (m instanceof Entity) { - stringMatches.add(MutableMap.of("match", m, "item", stepState.items.get(i))); + stringMatchesToCreate.add(MutableMap.of("match", m, "item", stepState.items.get(i), "index", i)); } } - - Object blueprintNotYetInterpolated = resolveBlueprint(context, () -> { - String type = context.getInput(TYPE); - if (Strings.isBlank(type)) throw new IllegalStateException("blueprint or type must be supplied"); // should've been caught earlier but check again for good measure - return "type: " + StringEscapes.JavaStringEscapes.wrapJavaString(type); - }, SetVariableWorkflowStep.InterpolationMode.DISABLED, TemplateProcessor.InterpolationErrorMode.FAIL); - - Set<Entity> addedChildren = MutableSet.copyOf(runSubWorkflowForPhase(context, instructionsForResuming, subworkflowTargetForResuming, - "Creating new children ("+stringMatches.size()+")", stepState.creationCheck, CREATION_CHECK_WORKFLOW, + List<Map> addedChildren = runOrResumeSubWorkflowForPhaseOrReturnPreviousIfCompleted(context, instructionsForResuming, subworkflowTargetForResuming, + "Creating new children ("+stringMatchesToCreate.size()+")", stepState.creationCheck, CREATION_CHECK_WORKFLOW, () -> new CustomWorkflowStep(MutableList.of( + "transform blueprint_resolved = ${blueprint} | resolve_expression", MutableMap.of( "step", "add-entity", - "parent", stepState.parent, - "blueprint", blueprintNotYetInterpolated + "parent", "${parent}", + "blueprint", "${blueprint_resolved}" ), "let result = ${output.entity}", MutableMap.of("step", "set-config", "config", MutableMap.of("entity", "${result}", "name", BrooklynConfigKeys.PLAN_ID.getName()), "value", "${match}"), "return ${result}")), - checkWorkflow -> ConfigBag.newInstance() - .configure(WorkflowCommonConfig.STEPS, - MutableList.of( - MutableMap.of("step", "foreach {match,item}", - "target", stringMatches, - "steps", MutableList.of(checkWorkflow), - "input", MutableMap.of( - "parent", stepState.parent, - "identifier_expression", stepState.identifier_expression - // TODO others? - ), - "idempotent", idempotent - // TODO concurrency? - ))) - .configure(WorkflowCommonConfig.IDEMPOTENT, idempotent) )); - - // TODO default lock on parent -> "update-children" - - Set<Entity> updatedChildren = (Set) entityMatches.stream().map(m -> m.get("match")).collect(Collectors.toSet()); - -// Object oldItem = context.getWorkflowExectionContext().getWorkflowScratchVariables().get("item"); -// for (int i = 0; i<stepState.items.size(); i++) { -// Object item = stepState.items.get(i); -// Object create = stepState.creationChecks.get(i); -// if (create==null || Boolean.TRUE.equals(create) || "true".equals(create)) { -// context.getWorkflowExectionContext().getWorkflowScratchVariables().put("item", item); -// Object blueprint = resolveBlueprint(context); -// try { -// List<? extends EntitySpec> specs = blueprint instanceof EntitySpec ? MutableList.of((EntitySpec) blueprint) -// : EntityManagementUtils.getAddChildrenSpecs(mgmt, -// blueprint instanceof String ? (String) blueprint : -// BeanWithTypeUtils.newYamlMapper(mgmt, false, null, false).writeValueAsString(blueprint)); -// if (specs.size()!=1) { -// throw new IllegalStateException("Wrong number of specs returned: "+specs); -// } -// EntitySpec spec = Iterables.getOnlyElement(specs); -// spec.parent(Entities.proxy(stepState.parent)); -// if (Strings.isBlank(stepState.identifier_expression)) throw new IllegalStateException("'identifier' expression is required"); -// -// String identifier = context.resolve(WorkflowExpressionResolution.WorkflowExpressionStage.STEP_RUNNING, stepState.identifier_expression, String.class); -// if (stepState.identifier_expression.equals(identifier)) throw new IllegalStateException("'identifier' must be an expression, e.g. '${item.id_field}' not the static value '"+stepState.identifier_expression +"'"); -// if (Strings.isBlank(identifier)) throw new IllegalStateException("'identifier' must not resolve to an empty string"); -// spec.configure(BrooklynConfigKeys.PLAN_ID, identifier); -// -// Entity child = (Entity) ((EntityInternal) context.getEntity()).getExecutionContext().get(Tasks.<Entity>builder().dynamic(false) -// .displayName("Creating entity " + -// (Strings.isNonBlank(spec.getDisplayName()) ? spec.getDisplayName() : spec.getType().getName())) -// .body(() -> mgmt.getEntityManager().createEntity(spec /* TODO , idempotentIdentifier */)) -// .build()); -// -// addedChildren.add(child); -// -// // TODO on_create -// -// } catch (JsonProcessingException e) { -// throw new RuntimeException(e); -// } -// -// } else if (Boolean.FALSE.equals(create) || "false".equals(create)) { -// // ignore -// -// } else if (create instanceof Entity) { -// updatedChildren.add( (Entity)create ); -// -// // TODO on_update -// -// } else { -// throw new IllegalStateException("Invalid result from match/creation check for item '"+item+"': "+create); -// } -// } -// context.getWorkflowExectionContext().getWorkflowScratchVariables().put("item", oldItem); + checkWorkflow -> outerWorkflowCustomers.apply(checkWorkflow, + foreach -> { + foreach.setTarget(stringMatchesToCreate); + foreach.setTargetVarName("{match,item,index}"); + foreach.setWorkflowOutput(MutableMap.of("index", "${index}", "match", "${match}", "child", "${output}", "item", "${item}")); + }), + list -> (List) list.stream().map(x -> MutableMap.copyOf( ((Map)x) ).add("child", new TransientEntityReference((Entity) ((Map)x).get("child")))).collect(Collectors.toList()) ); + + List<Map<String,Object>> onCreateTargets = (List) addedChildren.stream().map(x -> + MutableMap.copyOf(x).add("child", ((TransientEntityReference) x.get("child")).getEntity(mgmt)) + ).collect(Collectors.toList()); + runOrResumeSubWorkflowForPhaseOrReturnPreviousIfCompleted(context, instructionsForResuming, subworkflowTargetForResuming, + "Calling onCreate on newly created children ("+stringMatchesToCreate.size()+")", stepState.onCreation, ON_CREATE_WORKFLOW, + () -> new CustomWorkflowStep(MutableList.of( + MutableMap.of( + "step", "invoke-effector on_create", + "entity", "${child}", + "args", MutableMap.of("item", "${item}"), + "condition", MutableMap.of("target", "${child.effector.on_create}") + )) ), + checkWorkflow -> outerWorkflowCustomers.apply(checkWorkflow, + foreach -> { + foreach.setTarget(onCreateTargets); + foreach.setTargetVarName("{child,item,index}"); + }), + list -> list.size()); + + List<Map<String,Object>> onUpdateTargets = MutableList.copyOf(onCreateTargets); + for (int i=0; i<matches.size(); i++) { + Object m = matches.get(i); + if (m instanceof TransientEntityReference) { + m = ((TransientEntityReference)m).getEntity(mgmt); + } + if (m instanceof Entity) { + onUpdateTargets.add(MutableMap.of("child", m, "item", stepState.items.get(i), "index", i)); + } + } + runOrResumeSubWorkflowForPhaseOrReturnPreviousIfCompleted(context, instructionsForResuming, subworkflowTargetForResuming, + "Calling onCreate on newly created children ("+stringMatchesToCreate.size()+")", stepState.onUpdate, ON_UPDATE_WORKFLOW, + () -> new CustomWorkflowStep(MutableList.of( + MutableMap.of( + "step", "invoke-effector on_update", + "entity", "${child}", + "args", MutableMap.of("item", "${item}"), + "condition", MutableMap.of("target", "${child.effector.on_update}") + )) ), + checkWorkflow -> outerWorkflowCustomers.apply(checkWorkflow, + foreach -> { + foreach.setTarget(onUpdateTargets); + foreach.setTargetVarName("{child,item,index}"); + }), + list -> list.size()); Map<String,Entity> oldChildren = MutableMap.of(); stepState.parent.getChildren().forEach(c -> oldChildren.put(c.getId(), c)); - addedChildren.forEach(c -> oldChildren.remove(c.getId())); - updatedChildren.forEach(c -> oldChildren.remove(c.getId())); - - // TODO deletion_check, on_delete - for (Entity oldChild: oldChildren.values()) { - stepState.parent.removeChild(oldChild); + onUpdateTargets.forEach(c -> oldChildren.remove( ((Entity)c.get("child")).getId()) ); + + List<Entity> entitiesToPossiblyDelete = MutableList.copyOf(oldChildren.values()); + List<TransientEntityReference> deletionChecks = runOrResumeSubWorkflowForPhaseOrReturnPreviousIfCompleted(context, instructionsForResuming, subworkflowTargetForResuming, + "Creating new children ("+stringMatchesToCreate.size()+")", stepState.deletionCheck, DELETION_CHECK_WORKFLOW, + () -> new CustomWorkflowStep(MutableList.of("return true")), + checkWorkflow -> outerWorkflowCustomers.apply(checkWorkflow, + foreach -> { + foreach.setTarget(entitiesToPossiblyDelete); + foreach.setTargetVarName("child"); + foreach.setWorkflowOutput(MutableMap.of("delete", "${output}", "child", "${child}")); + }), + list -> (List) list.stream().map(x -> { + Object check = ((Map) x).get("delete"); + Entity child = (Entity) ((Map) x).get("child"); + check = TypeCoercions.coerce(check, Boolean.class); + if (check==null) throw new IllegalStateException("Invalid deletion check result for "+child+": "+check); + if (!Boolean.TRUE.equals(check)) return null; + return new TransientEntityReference(child); + }).filter(x -> x!=null).collect(Collectors.toList()) ); + + List<Map<String,Object>> onDeleteTargets = (List) deletionChecks.stream().map(t -> t.getEntity(mgmt)).filter(x -> x!=null).map(x -> MutableMap.of("child", x)).collect(Collectors.toList()); + runOrResumeSubWorkflowForPhaseOrReturnPreviousIfCompleted(context, instructionsForResuming, subworkflowTargetForResuming, + "Calling onCreate on newly created children ("+stringMatchesToCreate.size()+")", stepState.onDelete, ON_DELETE_WORKFLOW, + () -> new CustomWorkflowStep(MutableList.of( + MutableMap.of( + "step", "invoke-effector on_delete", + "entity", "${child}", + "condition", MutableMap.of("target", "${child.effector.on_delete}") + )) ), + checkWorkflow -> outerWorkflowCustomers.apply(checkWorkflow, + foreach -> { + foreach.setTarget(onDeleteTargets); + foreach.setTargetVarName("{child}"); + }), + list -> list.size()); + + for (TransientEntityReference entityToDelete: deletionChecks) { + Entity entity = entityToDelete.getEntity(mgmt); + if (entity!=null && Entities.isManagedActiveOrComingUp(entity)) Entities.unmanage(entity); } return context.getPreviousStepOutput(); } - protected <T> T runSubWorkflowForPhase(WorkflowStepInstanceExecutionContext context, ReplayContinuationInstructions instructionsForResuming, WorkflowExecutionContext subworkflowTargetForResuming, - String name, - WorkflowTagWithResult<T> stepSubState, ConfigKey<CustomWorkflowStep> key, - Supplier<CustomWorkflowStep> defaultWorkflow, Function<CustomWorkflowStep, ConfigBag> outerWorkflowConfigFn) { + protected <T> T runOrResumeSubWorkflowForPhaseOrReturnPreviousIfCompleted(WorkflowStepInstanceExecutionContext context, ReplayContinuationInstructions instructionsForResuming, WorkflowExecutionContext subworkflowTargetForResuming, + String name, + WorkflowTagWithResult<T> stepSubState, ConfigKey<CustomWorkflowStep> key, + Supplier<CustomWorkflowStep> defaultWorkflow, Function<CustomWorkflowStep, ConfigBag> outerWorkflowConfigFn, + Function<List,T> postprocess) { if (stepSubState.result==null) { ManagementContext mgmt = context.getManagementContext(); UpdateChildrenStepState stepState = getStepState(context); if (stepSubState.workflowTag ==null) { - CustomWorkflowStep checkWorkflow = context.getInput(key); + CustomWorkflowStep checkWorkflow = context.getWorkflowExectionContext().resolveCoercingOnly(WorkflowExpressionResolution.WorkflowExpressionStage.STEP_RUNNING, + context.getInputRaw(key.getName()), TypeToken.of(CustomWorkflowStep.class)); if (checkWorkflow == null) { checkWorkflow = defaultWorkflow.get(); } @@ -379,13 +422,13 @@ public class UpdateChildrenWorkflowStep extends WorkflowStepDefinition implement WorkflowReplayUtils.addNewSubWorkflow(context, stepState.matchCheck.workflowTag); setStepState(context, stepState); - stepSubState.result = (T) DynamicTasks.queue(matchWorkflow.getTask(true).get()).getUnchecked(); + stepSubState.result = postprocess.apply((List) DynamicTasks.queue(matchWorkflow.getTask(true).get()).getUnchecked()); } else { - stepSubState.result = (T) WorkflowReplayUtils.replayResumingInSubWorkflow("workflow effector", context, subworkflowTargetForResuming, instructionsForResuming, + stepSubState.result = postprocess.apply((List) WorkflowReplayUtils.replayResumingInSubWorkflow("workflow effector", context, subworkflowTargetForResuming, instructionsForResuming, (w, e)-> { LOG.debug("Sub workflow "+w+" is not replayable; running anew ("+ Exceptions.collapseText(e)+")"); return doTaskBody(context); - }, true); + }, true)); } stepSubState.workflowTag = null; setStepState(context, stepState); diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/flow/ForeachWorkflowStep.java b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/flow/ForeachWorkflowStep.java index 1b0a6324ef..4d9879b72b 100644 --- a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/flow/ForeachWorkflowStep.java +++ b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/flow/ForeachWorkflowStep.java @@ -31,6 +31,15 @@ public class ForeachWorkflowStep extends CustomWorkflowStep { public static final String SHORTHAND_TYPE_NAME_DEFAULT = "foreach"; + public ForeachWorkflowStep() {} + + public ForeachWorkflowStep(CustomWorkflowStep base) { + super(base); + } + + public void setTarget(Object x) { this.target = x; } + public void setTargetVarName(Object x) { this.target_var_name = x; } + @Override public void populateFromShorthand(String value) { if (input==null) input = MutableMap.of(); @@ -68,4 +77,24 @@ public class ForeachWorkflowStep extends CustomWorkflowStep { super.initializeSubWorkflowForTarget(context, target, nestedWorkflowContext); } + public void setIdempotent(String idempotent) { + this.idempotent = idempotent; + } + + public String getIdempotent() { + return idempotent; + } + + public void setConcurrency(Object concurrency) { + this.concurrency = concurrency; + } + + public Object getConcurrency() { + return concurrency; + } + + public void setWorkflowOutput(Object x) { + this.workflowOutput = x; + } + } diff --git a/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowUpdateChildrenStepTest.java b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowUpdateChildrenStepTest.java index fd3e8af8e0..1695c41be7 100644 --- a/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowUpdateChildrenStepTest.java +++ b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowUpdateChildrenStepTest.java @@ -18,8 +18,11 @@ */ package org.apache.brooklyn.core.workflow; +import org.apache.brooklyn.api.entity.Entity; import org.apache.brooklyn.api.entity.EntitySpec; import org.apache.brooklyn.core.entity.BrooklynConfigKeys; +import org.apache.brooklyn.core.entity.EntityAsserts; +import org.apache.brooklyn.core.sensor.Sensors; import org.apache.brooklyn.core.test.BrooklynMgmtUnitTestSupport; import org.apache.brooklyn.entity.stock.BasicApplication; import org.apache.brooklyn.entity.stock.BasicEntity; @@ -29,6 +32,7 @@ import org.apache.brooklyn.util.text.Strings; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import java.util.Collection; import java.util.Set; import java.util.stream.Collectors; @@ -54,9 +58,7 @@ public class WorkflowUpdateChildrenStepTest extends BrooklynMgmtUnitTestSupport "- step: let items", " value:", " - x_id: one", - " x_name: name1", " - x_id: two", - " x_name: name2", "- update-children type " + BasicEntity.class.getName() + " id ${item.x_id} from ${items}"), "first run at children"); execution.getTask(false).get().getUnchecked(); @@ -68,9 +70,7 @@ public class WorkflowUpdateChildrenStepTest extends BrooklynMgmtUnitTestSupport "- step: let items", " value:", " - x_id: one", - " x_name: name1", " - x_id: two", - " x_name: name-too", "- update-children type " + BasicEntity.class.getName() + " id ${item.x_id} from ${items}"), "first run at children"); execution.getTask(false).get().getUnchecked(); @@ -89,6 +89,30 @@ public class WorkflowUpdateChildrenStepTest extends BrooklynMgmtUnitTestSupport Asserts.assertSize(childrenIds, 0); } + @Test + public void testCreateWithBlueprint() { + WorkflowExecutionContext execution = WorkflowBasicTest.runWorkflow(app, Strings.lines( + "- step: let items", + " value:", + " - x_id: one", + " x_name: name1", + " - x_id: two", + " x_name: name2", + "- step: update-children id ${item.x_id} from ${items}", + " blueprint:", + " type: "+BasicEntity.class.getName(), + " name: ${item.x_name}", + ""), + "first run at children"); + execution.getTask(false).get().getUnchecked(); + + Set<String> childrenIds = app.getChildren().stream().map(c -> c.config().get(BrooklynConfigKeys.PLAN_ID)).collect(Collectors.toSet()); + Asserts.assertEquals(childrenIds, MutableSet.of("one", "two")); + + Set<String> childrenNames = app.getChildren().stream().map(c -> c.getDisplayName()).collect(Collectors.toSet()); + Asserts.assertEquals(childrenNames, MutableSet.of("name1", "name2")); + } + @Test public void testCustomMatch() { WorkflowExecutionContext execution = WorkflowBasicTest.runWorkflow(app, Strings.lines( @@ -98,19 +122,53 @@ public class WorkflowUpdateChildrenStepTest extends BrooklynMgmtUnitTestSupport " x_name: name1", " - x_id: two", " x_name: name2", - "- step: update-children type " + BasicEntity.class.getName() + " id ${item.x_id} from ${items}", + "- step: update-children type " + BasicEntity.class.getName() + " from ${items}", " match_check:", - " - condition:", + " - step: return ONE", + " condition:", " target: ${item.x_id}", " equals: one", - " return: ", - "", + " - return ${item.x_id}", ""), "first run at children"); execution.getTask(false).get().getUnchecked(); + Set<String> childrenIds = app.getChildren().stream().map(c -> c.config().get(BrooklynConfigKeys.PLAN_ID)).collect(Collectors.toSet()); + Asserts.assertEquals(childrenIds, MutableSet.of("ONE", "two")); + } + + @Test + public void testStaticIdentifierGivesError() { + WorkflowExecutionContext execution = WorkflowBasicTest.runWorkflow(app, Strings.lines( + "- let list items = [ ignored ]", + "- update-children type " + BasicEntity.class.getName() + " id item.x_id from ${items}"), + "first run at children"); + Asserts.assertFailsWith(() -> execution.getTask(false).get().getUnchecked(), + e -> Asserts.expectedFailureContainsIgnoreCase(e, "non-static", "item")); + } + + + @Test + public void testCrudWithHandlers() { + WorkflowExecutionContext execution = WorkflowBasicTest.runWorkflow(app, Strings.lines( + "- step: let items", + " value:", + " - x_id: one", + " x_name: name1", + " - x_id: two", + " x_name: name2", + "- step: update-children type " + BasicEntity.class.getName() + " id ${item.x_id} from ${items}", + " on_create:", + " - step: set-entity-name ${item.x_name}", + " entity: ${child}"), + "create run"); + execution.getTask(false).get().getUnchecked(); + Set<String> childrenIds = app.getChildren().stream().map(c -> c.config().get(BrooklynConfigKeys.PLAN_ID)).collect(Collectors.toSet()); Asserts.assertEquals(childrenIds, MutableSet.of("one", "two")); + Set<String> childrenNames = app.getChildren().stream().map(c -> c.getDisplayName()).collect(Collectors.toSet()); + Asserts.assertEquals(childrenNames, MutableSet.of("name1", "name2")); + Collection<Entity> oldChildren = app.getChildren(); execution = WorkflowBasicTest.runWorkflow(app, Strings.lines( "- step: let items", @@ -119,22 +177,54 @@ public class WorkflowUpdateChildrenStepTest extends BrooklynMgmtUnitTestSupport " x_name: name1", " - x_id: two", " x_name: name-too", - "- update-children type " + BasicEntity.class.getName() + " id ${item.x_id} from ${items}"), - "first run at children"); + " - x_id: three", + " x_name: name3", + "- step: update-children type " + BasicEntity.class.getName() + " id ${item.x_id} from ${items}", + " on_update:", + " - step: set-entity-name ${item.x_name}", + " entity: ${child}"), + "update run"); + execution.getTask(false).get().getUnchecked(); + childrenIds = app.getChildren().stream().map(c -> c.config().get(BrooklynConfigKeys.PLAN_ID)).collect(Collectors.toSet()); + Asserts.assertEquals(childrenIds, MutableSet.of("one", "two", "three")); + childrenNames = app.getChildren().stream().map(c -> c.getDisplayName()).collect(Collectors.toSet()); + Asserts.assertEquals(childrenNames, MutableSet.of("name1", "name-too", "name3")); + Asserts.assertThat(app.getChildren(), x -> x.containsAll(oldChildren)); // didn't replace children + + execution = WorkflowBasicTest.runWorkflow(app, Strings.lines( + "- step: let list items", + " value: []", + "- step: update-children type " + BasicEntity.class.getName() + " id ${item.x_id} from ${items}", + " deletion_check:", + " - condition:", + " target: ${child.config['" + BrooklynConfigKeys.PLAN_ID.getName() + "']}", + " equals: one", + " step: return true", + " - return false", + " on_update:", + " - set-sensor update_should_not_be_called = but was", + " on_delete:", + " - set-sensor deleted = ${child.name}"), + "delete run"); execution.getTask(false).get().getUnchecked(); childrenIds = app.getChildren().stream().map(c -> c.config().get(BrooklynConfigKeys.PLAN_ID)).collect(Collectors.toSet()); - Asserts.assertEquals(childrenIds, MutableSet.of("one", "two")); + Asserts.assertEquals(childrenIds, MutableSet.of("two", "three")); + EntityAsserts.assertAttributeEquals(app, Sensors.newStringSensor("deleted"), "name1"); + EntityAsserts.assertAttributeEquals(app, Sensors.newStringSensor("update_should_not_be_called"), null); // not called execution = WorkflowBasicTest.runWorkflow(app, Strings.lines( "- step: let list items", " value: []", - "- update-children type " + BasicEntity.class.getName() + " id ${item.x_id} from ${items}"), - "first run at children"); + "- step: update-children type " + BasicEntity.class.getName() + " id ${item.x_id} from ${items}", + " on_delete:", + " - set-sensor deleted = ${child.name}"), + "delete run"); execution.getTask(false).get().getUnchecked(); childrenIds = app.getChildren().stream().map(c -> c.config().get(BrooklynConfigKeys.PLAN_ID)).collect(Collectors.toSet()); Asserts.assertSize(childrenIds, 0); + EntityAsserts.assertAttributeEquals(app, Sensors.newStringSensor("deleted"), "name3"); } }
