This is an automated email from the ASF dual-hosted git repository.

heneveld pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git

commit 5e5324937746ae08bc7ea3adadb143b6215fc200
Author: Alex Heneveld <[email protected]>
AuthorDate: Tue May 23 19:20:12 2023 +0100

    update children test for interruption and replay
---
 .../core/workflow/WorkflowStepDefinition.java      |  2 +-
 .../steps/appmodel/UpdateChildrenWorkflowStep.java | 20 +++---
 .../WorkflowNestedAndCustomExtensionTest.java      |  3 +-
 .../workflow/WorkflowUpdateChildrenStepTest.java   | 77 +++++++++++++++++++++-
 4 files changed, 89 insertions(+), 13 deletions(-)

diff --git 
a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepDefinition.java
 
b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepDefinition.java
index ed9cc1e644..0a72c575be 100644
--- 
a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepDefinition.java
+++ 
b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepDefinition.java
@@ -169,7 +169,7 @@ public abstract class WorkflowStepDefinition {
         Task<?> t = Tasks.builder().displayName(specialName!=null ? 
specialName : computeName(context, true))
                 .tag(tagOverride != null ? tagOverride : 
BrooklynTaskTags.tagForWorkflow(context))
                 .tag(BrooklynTaskTags.WORKFLOW_TAG)
-                .tag(TaskTags.INESSENTIAL_TASK)  // we handle this specially, 
don't want the thread to fail
+                .tag(TaskTags.INESSENTIAL_TASK)   // need this so parent's 
queue doesn't abort if this fails, parent is able to run error handling
                 .body(() -> {
             log.debug("Starting " +
                     (specialName!=null ? specialName : "step 
"+context.getWorkflowExectionContext().getWorkflowStepReference(context.stepIndex,
 this))
diff --git 
a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/appmodel/UpdateChildrenWorkflowStep.java
 
b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/appmodel/UpdateChildrenWorkflowStep.java
index 95fc497882..73815a6622 100644
--- 
a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/appmodel/UpdateChildrenWorkflowStep.java
+++ 
b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/appmodel/UpdateChildrenWorkflowStep.java
@@ -156,7 +156,10 @@ public class UpdateChildrenWorkflowStep extends 
WorkflowStepDefinition implement
         UpdateChildrenStepState stepState = getStepState(context);
         if (stepState.matchCheck!=null && 
stepState.matchCheck.workflowTag!=null) return 
MutableList.of(retrieveSubWorkflow(context, 
stepState.matchCheck.workflowTag.getWorkflowId()));
         if (stepState.creationCheck!=null && 
stepState.creationCheck.workflowTag!=null) return 
MutableList.of(retrieveSubWorkflow(context, 
stepState.creationCheck.workflowTag.getWorkflowId()));
+        if (stepState.onCreate !=null && stepState.onCreate.workflowTag!=null) 
return MutableList.of(retrieveSubWorkflow(context, 
stepState.onCreate.workflowTag.getWorkflowId()));
+        if (stepState.onUpdate!=null && stepState.onUpdate.workflowTag!=null) 
return MutableList.of(retrieveSubWorkflow(context, 
stepState.onUpdate.workflowTag.getWorkflowId()));
         if (stepState.deletionCheck!=null && 
stepState.deletionCheck.workflowTag!=null) return 
MutableList.of(retrieveSubWorkflow(context, 
stepState.deletionCheck.workflowTag.getWorkflowId()));
+        if (stepState.onDelete!=null && stepState.onDelete.workflowTag!=null) 
return MutableList.of(retrieveSubWorkflow(context, 
stepState.onDelete.workflowTag.getWorkflowId()));
         return Collections.emptyList();
     }
 
@@ -177,7 +180,7 @@ public class UpdateChildrenWorkflowStep extends 
WorkflowStepDefinition implement
 
         WorkflowTagWithResult<List> matchCheck = new WorkflowTagWithResult<>();
         WorkflowTagWithResult<List<Map>> creationCheck = new 
WorkflowTagWithResult<>();
-        WorkflowTagWithResult<Object> onCreation = new 
WorkflowTagWithResult<>();
+        WorkflowTagWithResult<Object> onCreate = new WorkflowTagWithResult<>();
         WorkflowTagWithResult<Object> onUpdate = new WorkflowTagWithResult<>();
         WorkflowTagWithResult<List> deletionCheck = new 
WorkflowTagWithResult<>();
         WorkflowTagWithResult<Object> onDelete = new WorkflowTagWithResult<>();
@@ -309,7 +312,7 @@ public class UpdateChildrenWorkflowStep extends 
WorkflowStepDefinition implement
                 MutableMap.copyOf(x).add("child", ((TransientEntityReference) 
x.get("child")).getEntity(mgmt))
         ).collect(Collectors.toList());
         runOrResumeSubWorkflowForPhaseOrReturnPreviousIfCompleted(context, 
instructionsForResuming, subworkflowTargetForResuming,
-                "Calling onCreate on newly created children 
("+stringMatchesToCreate.size()+")", stepState.onCreation, ON_CREATE_WORKFLOW,
+                "Calling on_create on newly created children 
("+stringMatchesToCreate.size()+")", stepState.onCreate, ON_CREATE_WORKFLOW,
                 () -> new CustomWorkflowStep(MutableList.of(
                         MutableMap.of(
                                 "step", "invoke-effector on_create",
@@ -335,7 +338,7 @@ public class UpdateChildrenWorkflowStep extends 
WorkflowStepDefinition implement
             }
         }
         runOrResumeSubWorkflowForPhaseOrReturnPreviousIfCompleted(context, 
instructionsForResuming, subworkflowTargetForResuming,
-                "Calling onCreate on newly created children 
("+stringMatchesToCreate.size()+")", stepState.onUpdate, ON_UPDATE_WORKFLOW,
+                "Calling on_update on item-matched children 
("+onUpdateTargets.size()+")", stepState.onUpdate, ON_UPDATE_WORKFLOW,
                 () -> new CustomWorkflowStep(MutableList.of(
                         MutableMap.of(
                                 "step", "invoke-effector on_update",
@@ -356,7 +359,7 @@ public class UpdateChildrenWorkflowStep extends 
WorkflowStepDefinition implement
 
         List<Entity> entitiesToPossiblyDelete = 
MutableList.copyOf(oldChildren.values());
         List<TransientEntityReference> deletionChecks = 
runOrResumeSubWorkflowForPhaseOrReturnPreviousIfCompleted(context, 
instructionsForResuming, subworkflowTargetForResuming,
-                "Creating new children ("+stringMatchesToCreate.size()+")", 
stepState.deletionCheck, DELETION_CHECK_WORKFLOW,
+                "Checking old children ("+entitiesToPossiblyDelete.size()+") 
for deletion", stepState.deletionCheck, DELETION_CHECK_WORKFLOW,
                 () -> new CustomWorkflowStep(MutableList.of("return true")),
                 checkWorkflow -> outerWorkflowCustomers.apply(checkWorkflow,
                         foreach -> {
@@ -375,7 +378,7 @@ public class UpdateChildrenWorkflowStep extends 
WorkflowStepDefinition implement
 
         List<Map<String,Object>> onDeleteTargets = (List) 
deletionChecks.stream().map(t -> t.getEntity(mgmt)).filter(x -> x!=null).map(x 
-> MutableMap.of("child", x)).collect(Collectors.toList());
         runOrResumeSubWorkflowForPhaseOrReturnPreviousIfCompleted(context, 
instructionsForResuming, subworkflowTargetForResuming,
-                "Calling onCreate on newly created children 
("+stringMatchesToCreate.size()+")", stepState.onDelete, ON_DELETE_WORKFLOW,
+                "Calling on_delete on children to delete 
("+onDeleteTargets.size()+")", stepState.onDelete, ON_DELETE_WORKFLOW,
                 () -> new CustomWorkflowStep(MutableList.of(
                         MutableMap.of(
                                 "step", "invoke-effector on_delete",
@@ -419,15 +422,16 @@ public class UpdateChildrenWorkflowStep extends 
WorkflowStepDefinition implement
                         name,
                         outerWorkflowConfig, null, null, null);
                 stepSubState.workflowTag = 
BrooklynTaskTags.tagForWorkflow(matchWorkflow);
-                WorkflowReplayUtils.addNewSubWorkflow(context, 
stepState.matchCheck.workflowTag);
+                WorkflowReplayUtils.addNewSubWorkflow(context, 
stepSubState.workflowTag);
                 setStepState(context, stepState);
 
                 stepSubState.result = postprocess.apply((List) 
DynamicTasks.queue(matchWorkflow.getTask(true).get()).getUnchecked());
             } else {
                 stepSubState.result = postprocess.apply((List) 
WorkflowReplayUtils.replayResumingInSubWorkflow("workflow effector", context, 
subworkflowTargetForResuming, instructionsForResuming,
                         (w, e)-> {
-                            LOG.debug("Sub workflow "+w+" is not replayable; 
running anew ("+ Exceptions.collapseText(e)+")");
-                            return doTaskBody(context);
+                            throw Exceptions.propagate(e);
+                            //LOG.debug("Sub workflow "+w+" is not replayable; 
running anew ("+ Exceptions.collapseText(e)+")");
+                            //return doTaskBody(context);
                         }, true));
             }
             stepSubState.workflowTag = null;
diff --git 
a/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowNestedAndCustomExtensionTest.java
 
b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowNestedAndCustomExtensionTest.java
index c8b1411003..b5389fcb30 100644
--- 
a/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowNestedAndCustomExtensionTest.java
+++ 
b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowNestedAndCustomExtensionTest.java
@@ -306,8 +306,9 @@ public class WorkflowNestedAndCustomExtensionTest extends 
RebindTestFixture<Test
                 WorkflowExecutionContext lastWf = new 
WorkflowStatePersistenceViaSensors(mgmt()).getWorkflows(app).get(wfId);
                 log.info("replaying from last");
                 lastInvocation = 
lastWf.factory(false).createTaskReplaying(lastWf.factory(false)
-//                        
.makeInstructionsForReplayingFromLastReplayable("test", true));
                         .makeInstructionsForReplayResuming("test", true));
+                // can also test this, but less interesting
+//                        
.makeInstructionsForReplayingFromLastReplayable("test", true));
                 Entities.submit(app, lastInvocation);
             }
             log.info("success for iteration "+(i+1)+"\n");
diff --git 
a/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowUpdateChildrenStepTest.java
 
b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowUpdateChildrenStepTest.java
index 1695c41be7..50604ab796 100644
--- 
a/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowUpdateChildrenStepTest.java
+++ 
b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowUpdateChildrenStepTest.java
@@ -18,21 +18,35 @@
  */
 package org.apache.brooklyn.core.workflow;
 
+import com.google.common.collect.Iterables;
 import org.apache.brooklyn.api.entity.Entity;
 import org.apache.brooklyn.api.entity.EntitySpec;
+import org.apache.brooklyn.api.mgmt.Task;
+import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.config.ConfigKey;
+import org.apache.brooklyn.core.config.ConfigKeys;
+import org.apache.brooklyn.core.effector.EffectorBody;
+import org.apache.brooklyn.core.effector.Effectors;
 import org.apache.brooklyn.core.entity.BrooklynConfigKeys;
+import org.apache.brooklyn.core.entity.Entities;
 import org.apache.brooklyn.core.entity.EntityAsserts;
 import org.apache.brooklyn.core.sensor.Sensors;
 import org.apache.brooklyn.core.test.BrooklynMgmtUnitTestSupport;
 import org.apache.brooklyn.entity.stock.BasicApplication;
 import org.apache.brooklyn.entity.stock.BasicEntity;
+import org.apache.brooklyn.entity.stock.BasicEntityImpl;
+import org.apache.brooklyn.entity.stock.BasicStartable;
 import org.apache.brooklyn.test.Asserts;
+import org.apache.brooklyn.util.collections.MutableList;
+import org.apache.brooklyn.util.collections.MutableMap;
 import org.apache.brooklyn.util.collections.MutableSet;
+import org.apache.brooklyn.util.core.config.ConfigBag;
 import org.apache.brooklyn.util.text.Strings;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import java.util.Collection;
+import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -100,7 +114,7 @@ public class WorkflowUpdateChildrenStepTest extends 
BrooklynMgmtUnitTestSupport
                         "    x_name: name2",
                         "- step: update-children id ${item.x_id} from 
${items}",
                         "  blueprint:",
-                        "    type: "+BasicEntity.class.getName(),
+                        "    type: " + BasicEntity.class.getName(),
                         "    name: ${item.x_name}",
                         ""),
                 "first run at children");
@@ -147,7 +161,6 @@ public class WorkflowUpdateChildrenStepTest extends 
BrooklynMgmtUnitTestSupport
                 e -> Asserts.expectedFailureContainsIgnoreCase(e, 
"non-static", "item"));
     }
 
-
     @Test
     public void testCrudWithHandlers() {
         WorkflowExecutionContext execution = 
WorkflowBasicTest.runWorkflow(app, Strings.lines(
@@ -227,4 +240,62 @@ public class WorkflowUpdateChildrenStepTest extends 
BrooklynMgmtUnitTestSupport
         EntityAsserts.assertAttributeEquals(app, 
Sensors.newStringSensor("deleted"), "name3");
     }
 
-}
+    static class UpdateableChildThing extends BasicEntityImpl {
+
+        static final AttributeSensor<String> BLOCKAGE = 
Sensors.newStringSensor("blockage");
+
+        @Override
+        public void init() {
+            super.init();
+            WorkflowEffector eff = new WorkflowEffector(ConfigBag.newInstance()
+                    .configure(WorkflowEffector.EFFECTOR_NAME, "on_update")
+                    .configure(WorkflowEffector.EFFECTOR_PARAMETER_DEFS,
+                            MutableMap.of("items", null))
+                    .configure(WorkflowEffector.STEPS, MutableList.of(
+                            MutableMap.of("condition", MutableMap.of("sensor", 
"blockage"),
+                                    "s", "fail message Blockage 
${entity.sensor.blockage}"),
+                            "set-entity-name ${item.x_name}"))
+                    .configure(WorkflowEffector.REPLAYABLE, "from start")
+                    );
+            eff.apply(this);
+        }
+    }
+
+    @Test
+    public void testCrudHandlersAsEffectorsReplaying() {
+        WorkflowBasicTest.addRegisteredTypeSpec(mgmt, "updateable-thing", 
UpdateableChildThing.class, Entity.class);
+        String updateWorkflow = Strings.lines(
+                "- update-children type updateable-thing id ${item.x_id} from 
${entity.sensor.items}"
+        );
+        Runnable update = () -> WorkflowBasicTest.runWorkflow(app, 
updateWorkflow, "test").getTask(false).get().getUnchecked();
+
+        app.sensors().set(Sensors.newSensor(Object.class, "items"),
+                MutableList.of(MutableMap.of("x_id", "one", "x_name", 
"name1"), MutableMap.of("x_id", "two", "x_name", "name2")));
+        update.run();
+        Set<String> childrenIds = app.getChildren().stream().map(c -> 
c.config().get(BrooklynConfigKeys.PLAN_ID)).collect(Collectors.toSet());
+        Asserts.assertEquals(childrenIds, MutableSet.of("one", "two"));
+        Set<String> childrenNames = app.getChildren().stream().map(c -> 
c.getDisplayName()).collect(Collectors.toSet());
+        Asserts.assertEquals(childrenNames, MutableSet.of("name1", "name2"));
+
+        // setting a sensor to fail in a sub-workflow should fail the update
+        Entity child2 = Iterables.get(app.getChildren(), 1);
+        child2.sensors().set(UpdateableChildThing.BLOCKAGE, "blocked!");
+        WorkflowExecutionContext w1 = WorkflowBasicTest.runWorkflow(app, 
updateWorkflow, "test");
+        Asserts.assertFailsWith(() -> w1.getTask(true).get().getUnchecked(),
+            e -> Asserts.expectedFailureContainsIgnoreCase(e, "blocked!"));
+        child2.sensors().set(UpdateableChildThing.BLOCKAGE, null);
+        WorkflowExecutionContext.Factory w1f = w1.factory(false);
+        // but we can resume and this time it works
+        Task<Object> w1r = Entities.submit(app, 
w1f.createTaskReplaying(w1f.makeInstructionsForReplayResuming("blockage fixed", 
false)));
+        w1r.getUnchecked();
+        // and we still have the two children
+        childrenIds = app.getChildren().stream().map(c -> 
c.config().get(BrooklynConfigKeys.PLAN_ID)).collect(Collectors.toSet());
+        Asserts.assertEquals(childrenIds, MutableSet.of("one", "two"));
+
+        app.sensors().set(Sensors.newSensor(Object.class, "items"), 
MutableList.of());
+        update.run();
+        childrenIds = app.getChildren().stream().map(c -> 
c.config().get(BrooklynConfigKeys.PLAN_ID)).collect(Collectors.toSet());
+        Asserts.assertSize(childrenIds, 0);
+    }
+
+}
\ No newline at end of file

Reply via email to