This is an automated email from the ASF dual-hosted git repository. heneveld pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git
commit 5e5324937746ae08bc7ea3adadb143b6215fc200 Author: Alex Heneveld <[email protected]> AuthorDate: Tue May 23 19:20:12 2023 +0100 update children test for interruption and replay --- .../core/workflow/WorkflowStepDefinition.java | 2 +- .../steps/appmodel/UpdateChildrenWorkflowStep.java | 20 +++--- .../WorkflowNestedAndCustomExtensionTest.java | 3 +- .../workflow/WorkflowUpdateChildrenStepTest.java | 77 +++++++++++++++++++++- 4 files changed, 89 insertions(+), 13 deletions(-) diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepDefinition.java b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepDefinition.java index ed9cc1e644..0a72c575be 100644 --- a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepDefinition.java +++ b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepDefinition.java @@ -169,7 +169,7 @@ public abstract class WorkflowStepDefinition { Task<?> t = Tasks.builder().displayName(specialName!=null ? specialName : computeName(context, true)) .tag(tagOverride != null ? tagOverride : BrooklynTaskTags.tagForWorkflow(context)) .tag(BrooklynTaskTags.WORKFLOW_TAG) - .tag(TaskTags.INESSENTIAL_TASK) // we handle this specially, don't want the thread to fail + .tag(TaskTags.INESSENTIAL_TASK) // need this so parent's queue doesn't abort if this fails, parent is able to run error handling .body(() -> { log.debug("Starting " + (specialName!=null ? specialName : "step "+context.getWorkflowExectionContext().getWorkflowStepReference(context.stepIndex, this)) diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/appmodel/UpdateChildrenWorkflowStep.java b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/appmodel/UpdateChildrenWorkflowStep.java index 95fc497882..73815a6622 100644 --- a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/appmodel/UpdateChildrenWorkflowStep.java +++ b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/appmodel/UpdateChildrenWorkflowStep.java @@ -156,7 +156,10 @@ public class UpdateChildrenWorkflowStep extends WorkflowStepDefinition implement UpdateChildrenStepState stepState = getStepState(context); if (stepState.matchCheck!=null && stepState.matchCheck.workflowTag!=null) return MutableList.of(retrieveSubWorkflow(context, stepState.matchCheck.workflowTag.getWorkflowId())); if (stepState.creationCheck!=null && stepState.creationCheck.workflowTag!=null) return MutableList.of(retrieveSubWorkflow(context, stepState.creationCheck.workflowTag.getWorkflowId())); + if (stepState.onCreate !=null && stepState.onCreate.workflowTag!=null) return MutableList.of(retrieveSubWorkflow(context, stepState.onCreate.workflowTag.getWorkflowId())); + if (stepState.onUpdate!=null && stepState.onUpdate.workflowTag!=null) return MutableList.of(retrieveSubWorkflow(context, stepState.onUpdate.workflowTag.getWorkflowId())); if (stepState.deletionCheck!=null && stepState.deletionCheck.workflowTag!=null) return MutableList.of(retrieveSubWorkflow(context, stepState.deletionCheck.workflowTag.getWorkflowId())); + if (stepState.onDelete!=null && stepState.onDelete.workflowTag!=null) return MutableList.of(retrieveSubWorkflow(context, stepState.onDelete.workflowTag.getWorkflowId())); return Collections.emptyList(); } @@ -177,7 +180,7 @@ public class UpdateChildrenWorkflowStep extends WorkflowStepDefinition implement WorkflowTagWithResult<List> matchCheck = new WorkflowTagWithResult<>(); WorkflowTagWithResult<List<Map>> creationCheck = new WorkflowTagWithResult<>(); - WorkflowTagWithResult<Object> onCreation = new WorkflowTagWithResult<>(); + WorkflowTagWithResult<Object> onCreate = new WorkflowTagWithResult<>(); WorkflowTagWithResult<Object> onUpdate = new WorkflowTagWithResult<>(); WorkflowTagWithResult<List> deletionCheck = new WorkflowTagWithResult<>(); WorkflowTagWithResult<Object> onDelete = new WorkflowTagWithResult<>(); @@ -309,7 +312,7 @@ public class UpdateChildrenWorkflowStep extends WorkflowStepDefinition implement MutableMap.copyOf(x).add("child", ((TransientEntityReference) x.get("child")).getEntity(mgmt)) ).collect(Collectors.toList()); runOrResumeSubWorkflowForPhaseOrReturnPreviousIfCompleted(context, instructionsForResuming, subworkflowTargetForResuming, - "Calling onCreate on newly created children ("+stringMatchesToCreate.size()+")", stepState.onCreation, ON_CREATE_WORKFLOW, + "Calling on_create on newly created children ("+stringMatchesToCreate.size()+")", stepState.onCreate, ON_CREATE_WORKFLOW, () -> new CustomWorkflowStep(MutableList.of( MutableMap.of( "step", "invoke-effector on_create", @@ -335,7 +338,7 @@ public class UpdateChildrenWorkflowStep extends WorkflowStepDefinition implement } } runOrResumeSubWorkflowForPhaseOrReturnPreviousIfCompleted(context, instructionsForResuming, subworkflowTargetForResuming, - "Calling onCreate on newly created children ("+stringMatchesToCreate.size()+")", stepState.onUpdate, ON_UPDATE_WORKFLOW, + "Calling on_update on item-matched children ("+onUpdateTargets.size()+")", stepState.onUpdate, ON_UPDATE_WORKFLOW, () -> new CustomWorkflowStep(MutableList.of( MutableMap.of( "step", "invoke-effector on_update", @@ -356,7 +359,7 @@ public class UpdateChildrenWorkflowStep extends WorkflowStepDefinition implement List<Entity> entitiesToPossiblyDelete = MutableList.copyOf(oldChildren.values()); List<TransientEntityReference> deletionChecks = runOrResumeSubWorkflowForPhaseOrReturnPreviousIfCompleted(context, instructionsForResuming, subworkflowTargetForResuming, - "Creating new children ("+stringMatchesToCreate.size()+")", stepState.deletionCheck, DELETION_CHECK_WORKFLOW, + "Checking old children ("+entitiesToPossiblyDelete.size()+") for deletion", stepState.deletionCheck, DELETION_CHECK_WORKFLOW, () -> new CustomWorkflowStep(MutableList.of("return true")), checkWorkflow -> outerWorkflowCustomers.apply(checkWorkflow, foreach -> { @@ -375,7 +378,7 @@ public class UpdateChildrenWorkflowStep extends WorkflowStepDefinition implement List<Map<String,Object>> onDeleteTargets = (List) deletionChecks.stream().map(t -> t.getEntity(mgmt)).filter(x -> x!=null).map(x -> MutableMap.of("child", x)).collect(Collectors.toList()); runOrResumeSubWorkflowForPhaseOrReturnPreviousIfCompleted(context, instructionsForResuming, subworkflowTargetForResuming, - "Calling onCreate on newly created children ("+stringMatchesToCreate.size()+")", stepState.onDelete, ON_DELETE_WORKFLOW, + "Calling on_delete on children to delete ("+onDeleteTargets.size()+")", stepState.onDelete, ON_DELETE_WORKFLOW, () -> new CustomWorkflowStep(MutableList.of( MutableMap.of( "step", "invoke-effector on_delete", @@ -419,15 +422,16 @@ public class UpdateChildrenWorkflowStep extends WorkflowStepDefinition implement name, outerWorkflowConfig, null, null, null); stepSubState.workflowTag = BrooklynTaskTags.tagForWorkflow(matchWorkflow); - WorkflowReplayUtils.addNewSubWorkflow(context, stepState.matchCheck.workflowTag); + WorkflowReplayUtils.addNewSubWorkflow(context, stepSubState.workflowTag); setStepState(context, stepState); stepSubState.result = postprocess.apply((List) DynamicTasks.queue(matchWorkflow.getTask(true).get()).getUnchecked()); } else { stepSubState.result = postprocess.apply((List) WorkflowReplayUtils.replayResumingInSubWorkflow("workflow effector", context, subworkflowTargetForResuming, instructionsForResuming, (w, e)-> { - LOG.debug("Sub workflow "+w+" is not replayable; running anew ("+ Exceptions.collapseText(e)+")"); - return doTaskBody(context); + throw Exceptions.propagate(e); + //LOG.debug("Sub workflow "+w+" is not replayable; running anew ("+ Exceptions.collapseText(e)+")"); + //return doTaskBody(context); }, true)); } stepSubState.workflowTag = null; diff --git a/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowNestedAndCustomExtensionTest.java b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowNestedAndCustomExtensionTest.java index c8b1411003..b5389fcb30 100644 --- a/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowNestedAndCustomExtensionTest.java +++ b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowNestedAndCustomExtensionTest.java @@ -306,8 +306,9 @@ public class WorkflowNestedAndCustomExtensionTest extends RebindTestFixture<Test WorkflowExecutionContext lastWf = new WorkflowStatePersistenceViaSensors(mgmt()).getWorkflows(app).get(wfId); log.info("replaying from last"); lastInvocation = lastWf.factory(false).createTaskReplaying(lastWf.factory(false) -// .makeInstructionsForReplayingFromLastReplayable("test", true)); .makeInstructionsForReplayResuming("test", true)); + // can also test this, but less interesting +// .makeInstructionsForReplayingFromLastReplayable("test", true)); Entities.submit(app, lastInvocation); } log.info("success for iteration "+(i+1)+"\n"); diff --git a/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowUpdateChildrenStepTest.java b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowUpdateChildrenStepTest.java index 1695c41be7..50604ab796 100644 --- a/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowUpdateChildrenStepTest.java +++ b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowUpdateChildrenStepTest.java @@ -18,21 +18,35 @@ */ package org.apache.brooklyn.core.workflow; +import com.google.common.collect.Iterables; import org.apache.brooklyn.api.entity.Entity; import org.apache.brooklyn.api.entity.EntitySpec; +import org.apache.brooklyn.api.mgmt.Task; +import org.apache.brooklyn.api.sensor.AttributeSensor; +import org.apache.brooklyn.config.ConfigKey; +import org.apache.brooklyn.core.config.ConfigKeys; +import org.apache.brooklyn.core.effector.EffectorBody; +import org.apache.brooklyn.core.effector.Effectors; import org.apache.brooklyn.core.entity.BrooklynConfigKeys; +import org.apache.brooklyn.core.entity.Entities; import org.apache.brooklyn.core.entity.EntityAsserts; import org.apache.brooklyn.core.sensor.Sensors; import org.apache.brooklyn.core.test.BrooklynMgmtUnitTestSupport; import org.apache.brooklyn.entity.stock.BasicApplication; import org.apache.brooklyn.entity.stock.BasicEntity; +import org.apache.brooklyn.entity.stock.BasicEntityImpl; +import org.apache.brooklyn.entity.stock.BasicStartable; import org.apache.brooklyn.test.Asserts; +import org.apache.brooklyn.util.collections.MutableList; +import org.apache.brooklyn.util.collections.MutableMap; import org.apache.brooklyn.util.collections.MutableSet; +import org.apache.brooklyn.util.core.config.ConfigBag; import org.apache.brooklyn.util.text.Strings; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import java.util.Collection; +import java.util.Map; import java.util.Set; import java.util.stream.Collectors; @@ -100,7 +114,7 @@ public class WorkflowUpdateChildrenStepTest extends BrooklynMgmtUnitTestSupport " x_name: name2", "- step: update-children id ${item.x_id} from ${items}", " blueprint:", - " type: "+BasicEntity.class.getName(), + " type: " + BasicEntity.class.getName(), " name: ${item.x_name}", ""), "first run at children"); @@ -147,7 +161,6 @@ public class WorkflowUpdateChildrenStepTest extends BrooklynMgmtUnitTestSupport e -> Asserts.expectedFailureContainsIgnoreCase(e, "non-static", "item")); } - @Test public void testCrudWithHandlers() { WorkflowExecutionContext execution = WorkflowBasicTest.runWorkflow(app, Strings.lines( @@ -227,4 +240,62 @@ public class WorkflowUpdateChildrenStepTest extends BrooklynMgmtUnitTestSupport EntityAsserts.assertAttributeEquals(app, Sensors.newStringSensor("deleted"), "name3"); } -} + static class UpdateableChildThing extends BasicEntityImpl { + + static final AttributeSensor<String> BLOCKAGE = Sensors.newStringSensor("blockage"); + + @Override + public void init() { + super.init(); + WorkflowEffector eff = new WorkflowEffector(ConfigBag.newInstance() + .configure(WorkflowEffector.EFFECTOR_NAME, "on_update") + .configure(WorkflowEffector.EFFECTOR_PARAMETER_DEFS, + MutableMap.of("items", null)) + .configure(WorkflowEffector.STEPS, MutableList.of( + MutableMap.of("condition", MutableMap.of("sensor", "blockage"), + "s", "fail message Blockage ${entity.sensor.blockage}"), + "set-entity-name ${item.x_name}")) + .configure(WorkflowEffector.REPLAYABLE, "from start") + ); + eff.apply(this); + } + } + + @Test + public void testCrudHandlersAsEffectorsReplaying() { + WorkflowBasicTest.addRegisteredTypeSpec(mgmt, "updateable-thing", UpdateableChildThing.class, Entity.class); + String updateWorkflow = Strings.lines( + "- update-children type updateable-thing id ${item.x_id} from ${entity.sensor.items}" + ); + Runnable update = () -> WorkflowBasicTest.runWorkflow(app, updateWorkflow, "test").getTask(false).get().getUnchecked(); + + app.sensors().set(Sensors.newSensor(Object.class, "items"), + MutableList.of(MutableMap.of("x_id", "one", "x_name", "name1"), MutableMap.of("x_id", "two", "x_name", "name2"))); + update.run(); + Set<String> childrenIds = app.getChildren().stream().map(c -> c.config().get(BrooklynConfigKeys.PLAN_ID)).collect(Collectors.toSet()); + Asserts.assertEquals(childrenIds, MutableSet.of("one", "two")); + Set<String> childrenNames = app.getChildren().stream().map(c -> c.getDisplayName()).collect(Collectors.toSet()); + Asserts.assertEquals(childrenNames, MutableSet.of("name1", "name2")); + + // setting a sensor to fail in a sub-workflow should fail the update + Entity child2 = Iterables.get(app.getChildren(), 1); + child2.sensors().set(UpdateableChildThing.BLOCKAGE, "blocked!"); + WorkflowExecutionContext w1 = WorkflowBasicTest.runWorkflow(app, updateWorkflow, "test"); + Asserts.assertFailsWith(() -> w1.getTask(true).get().getUnchecked(), + e -> Asserts.expectedFailureContainsIgnoreCase(e, "blocked!")); + child2.sensors().set(UpdateableChildThing.BLOCKAGE, null); + WorkflowExecutionContext.Factory w1f = w1.factory(false); + // but we can resume and this time it works + Task<Object> w1r = Entities.submit(app, w1f.createTaskReplaying(w1f.makeInstructionsForReplayResuming("blockage fixed", false))); + w1r.getUnchecked(); + // and we still have the two children + childrenIds = app.getChildren().stream().map(c -> c.config().get(BrooklynConfigKeys.PLAN_ID)).collect(Collectors.toSet()); + Asserts.assertEquals(childrenIds, MutableSet.of("one", "two")); + + app.sensors().set(Sensors.newSensor(Object.class, "items"), MutableList.of()); + update.run(); + childrenIds = app.getChildren().stream().map(c -> c.config().get(BrooklynConfigKeys.PLAN_ID)).collect(Collectors.toSet()); + Asserts.assertSize(childrenIds, 0); + } + +} \ No newline at end of file
