This is an automated email from the ASF dual-hosted git repository. heneveld pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git
commit 4309fff796aad730f8ec1d538730073471b6aafe Author: Alex Heneveld <[email protected]> AuthorDate: Mon May 1 12:06:04 2023 +0100 tidy nested workflow error handling, name, and inclusion as a child --- .../core/workflow/steps/CustomWorkflowStep.java | 37 +++++++--- .../WorkflowNestedAndCustomExtensionTest.java | 79 +++++++++++++++++++++- 2 files changed, 103 insertions(+), 13 deletions(-) diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/CustomWorkflowStep.java b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/CustomWorkflowStep.java index efc51fc185..592968790a 100644 --- a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/CustomWorkflowStep.java +++ b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/CustomWorkflowStep.java @@ -57,6 +57,7 @@ import javax.annotation.Nullable; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; import java.util.stream.Collectors; public class CustomWorkflowStep extends WorkflowStepDefinition implements WorkflowStepDefinition.WorkflowStepDefinitionWithSpecialDeserialization, WorkflowStepDefinition.WorkflowStepDefinitionWithSubWorkflow { @@ -203,7 +204,7 @@ public class CustomWorkflowStep extends WorkflowStepDefinition implements Workfl AtomicInteger index = new AtomicInteger(0); ((Iterable<?>) targetR).forEach(t -> { - WorkflowExecutionContext nw = newWorkflow(context, t, index.getAndIncrement()); + WorkflowExecutionContext nw = newWorkflow(context, t, wasList ? index.getAndIncrement() : null); Maybe<Task<Object>> mt = nw.getTask(true); String targetS = wasList || t !=null ? " for target '"+t+"'" : ""; @@ -298,9 +299,8 @@ public class CustomWorkflowStep extends WorkflowStepDefinition implements Workfl } }), task); - } else { - DynamicTasks.queue(task); } + DynamicTasks.queue(task); submitted.add(task); } } @@ -308,6 +308,11 @@ public class CustomWorkflowStep extends WorkflowStepDefinition implements Workfl submitted.forEach(t -> { try { + if (!t.isSubmitted() && !errors.isEmpty()) { + // if concurrent, all tasks will be submitted, and we should wait; + // if not, then there might be queued tasks not yet submitted; if there are errors, they will never be submitted + return; + } t.get(); } catch (Throwable tt) { errors.add(tt); @@ -321,7 +326,7 @@ public class CustomWorkflowStep extends WorkflowStepDefinition implements Workfl context.setOutput(result); if (!errors.isEmpty()) { - throw Exceptions.propagate("Error running sub-workflows in "+context.getWorkflowStepReference(), errors); + throw Exceptions.propagate("Error"+(errors.size()>1 ? "s" : "")+" running sub-workflow"+(nestedWorkflowContexts.size()>1 ? "s" : "")+" in "+context.getWorkflowStepReference(), errors); } return !wasList ? Iterables.getOnlyElement(result) : result; @@ -352,7 +357,10 @@ public class CustomWorkflowStep extends WorkflowStepDefinition implements Workfl } public String getNameOrDefault() { - return (Strings.isNonBlank(getName()) ? getName() : Strings.isNonBlank(shorthandTypeName) ? shorthandTypeName : "custom step"); + return getNameOrDefault(() -> Strings.isNonBlank(shorthandTypeName) ? shorthandTypeName : "custom step"); + } + public String getNameOrDefault(Supplier<String> defaultSupplier) { + return Strings.isNonBlank(getName()) ? getName() : defaultSupplier==null ? null : defaultSupplier.get(); } @Override @@ -397,18 +405,25 @@ public class CustomWorkflowStep extends WorkflowStepDefinition implements Workfl return result; } - private WorkflowExecutionContext newWorkflow(WorkflowStepInstanceExecutionContext context, Object target, int targetIndex) { + private WorkflowExecutionContext newWorkflow(WorkflowStepInstanceExecutionContext context, Object target, Integer targetIndexOrNull) { if (steps==null) throw new IllegalArgumentException("Cannot make new workflow with no steps"); + String indexName = targetIndexOrNull==null ? "" : " "+(targetIndexOrNull+1); + String name = getNameOrDefault(null); + name = (name == null ? "Sub-workflow" + indexName : "Sub-workflow"+indexName+" for " + name); + String targetString = target==null ? null : target.toString(); + if (targetString!=null && targetString.length()<60 && !Strings.isMultiLine(targetString)) name += " ("+targetString+")"; + WorkflowExecutionContext nestedWorkflowContext = WorkflowExecutionContext.newInstanceUnpersistedWithParent( target instanceof BrooklynObject ? (BrooklynObject) target : context.getEntity(), context.getWorkflowExectionContext(), - WorkflowExecutionContext.WorkflowContextType.NESTED_WORKFLOW, "Workflow for " + getNameOrDefault(), + WorkflowExecutionContext.WorkflowContextType.NESTED_WORKFLOW, + name, getConfigForSubWorkflow(false), null, ConfigBag.newInstance(getInput()), null); - if (target!=null) { - nestedWorkflowContext.getWorkflowScratchVariables().put("target", target); - nestedWorkflowContext.getWorkflowScratchVariables().put("target_index", targetIndex); - } + + nestedWorkflowContext.getWorkflowScratchVariables().put("target", target); + if (targetIndexOrNull!=null) nestedWorkflowContext.getWorkflowScratchVariables().put("target_index", targetIndexOrNull); + return nestedWorkflowContext; } diff --git a/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowNestedAndCustomExtensionTest.java b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowNestedAndCustomExtensionTest.java index 24f387fa1f..289184b37e 100644 --- a/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowNestedAndCustomExtensionTest.java +++ b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowNestedAndCustomExtensionTest.java @@ -22,6 +22,7 @@ import com.google.common.collect.Iterables; import org.apache.brooklyn.api.entity.Entity; import org.apache.brooklyn.api.entity.EntityLocal; import org.apache.brooklyn.api.entity.EntitySpec; +import org.apache.brooklyn.api.mgmt.HasTaskChildren; import org.apache.brooklyn.api.mgmt.Task; import org.apache.brooklyn.api.sensor.AttributeSensor; import org.apache.brooklyn.api.typereg.RegisteredType; @@ -90,6 +91,7 @@ public class WorkflowNestedAndCustomExtensionTest extends RebindTestFixture<Test ClassLogWatcher lastLogWatcher; TestApplication app; + Task<?> lastInvocation; Object invokeWorkflowStepsWithLogging(List<Object> steps) throws Exception { return invokeWorkflowStepsWithLogging(steps, null); @@ -106,8 +108,8 @@ public class WorkflowNestedAndCustomExtensionTest extends RebindTestFixture<Test .putAll(extraEffectorConfig)); eff.apply((EntityLocal)app); - Task<?> invocation = app.invoke(app.getEntityType().getEffectorByName("myWorkflow").get(), null); - return invocation.getUnchecked(); + lastInvocation = app.invoke(app.getEntityType().getEffectorByName("myWorkflow").get(), null); + return lastInvocation.getUnchecked(); } } @@ -705,4 +707,77 @@ public class WorkflowNestedAndCustomExtensionTest extends RebindTestFixture<Test } } + + @Test + public void testConcurrentNestedWorkflowsShowAsChildren() throws Exception { + Object result = invokeWorkflowStepsWithLogging((List<Object>) Iterables.getOnlyElement(Yamls.parseAll(Strings.lines( + " - type: workflow", + " target:", + " - A", + " - B", + " concurrency: all", + " steps:", + " - set-sensor s_${target_index} = ${target}", + " - return ${target}", + "" + )))); + + // ensure it ran + Asserts.assertEquals(MutableSet.of("A", "B"), MutableSet.copyOf((Iterable)result)); + EntityAsserts.assertAttributeEquals(app, Sensors.newStringSensor("s_0"), "A"); + EntityAsserts.assertAttributeEquals(app, Sensors.newStringSensor("s_1"), "B"); + + Task<?> firstStep = Iterables.getOnlyElement( ((HasTaskChildren) lastInvocation).getChildren() ); + Iterable<Task<?>> subworkflows = ((HasTaskChildren) firstStep).getChildren(); + Asserts.assertSize(subworkflows, 2); + } + + @Test + public void testNestedWorkflowsFail() throws Exception { + Asserts.assertFailsWith(() -> invokeWorkflowStepsWithLogging((List<Object>) Iterables.getOnlyElement(Yamls.parseAll(Strings.lines( + " - type: workflow", + " target:", + " - A", + " - B", + " steps:", + " - fail message deliberate failure on ${target}", + "" + )))), error -> Asserts.expectedFailureContainsIgnoreCase(error, "error running sub-workflows ", "deliberate failure on A")); + + Asserts.assertFailsWith(() -> invokeWorkflowStepsWithLogging((List<Object>) Iterables.getOnlyElement(Yamls.parseAll(Strings.lines( + " - type: workflow", + " target:", + " - A", + " - B", + " concurrency: all", + " steps:", + " - fail message deliberate failure on ${target}", + "" + )))), error -> Asserts.expectedFailureContainsIgnoreCase(error, "errors running sub-workflows ", "2 errors including", "deliberate failure on A")); + + Asserts.assertFailsWith(() -> invokeWorkflowStepsWithLogging((List<Object>) Iterables.getOnlyElement(Yamls.parseAll(Strings.lines( + " - type: workflow", + " target:", + " - A", + " concurrency: all", + " steps:", + " - fail message deliberate failure on ${target}", + "" + )))), error -> Asserts.expectedFailureContainsIgnoreCase(error, "error running sub-workflow ", "deliberate failure on A")); + + Asserts.assertFailsWith(() -> invokeWorkflowStepsWithLogging((List<Object>) Iterables.getOnlyElement(Yamls.parseAll(Strings.lines( + " - type: workflow", + " target:", + " - A", + " - B", + " - C", + " steps:", + " - condition:", + " target: ${target}", + " equals: B", + " step: fail message deliberate failure on ${target}", + "" + )))), error -> Asserts.expectedFailureContainsIgnoreCase(error, "error running sub-workflows ", "deliberate failure on B")); + } + }
