This is an automated email from the ASF dual-hosted git repository.

heneveld pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git

commit 4309fff796aad730f8ec1d538730073471b6aafe
Author: Alex Heneveld <[email protected]>
AuthorDate: Mon May 1 12:06:04 2023 +0100

    tidy nested workflow error handling, name, and inclusion as a child
---
 .../core/workflow/steps/CustomWorkflowStep.java    | 37 +++++++---
 .../WorkflowNestedAndCustomExtensionTest.java      | 79 +++++++++++++++++++++-
 2 files changed, 103 insertions(+), 13 deletions(-)

diff --git 
a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/CustomWorkflowStep.java
 
b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/CustomWorkflowStep.java
index efc51fc185..592968790a 100644
--- 
a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/CustomWorkflowStep.java
+++ 
b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/CustomWorkflowStep.java
@@ -57,6 +57,7 @@ import javax.annotation.Nullable;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 public class CustomWorkflowStep extends WorkflowStepDefinition implements 
WorkflowStepDefinition.WorkflowStepDefinitionWithSpecialDeserialization, 
WorkflowStepDefinition.WorkflowStepDefinitionWithSubWorkflow {
@@ -203,7 +204,7 @@ public class CustomWorkflowStep extends 
WorkflowStepDefinition implements Workfl
 
         AtomicInteger index = new AtomicInteger(0);
         ((Iterable<?>) targetR).forEach(t -> {
-            WorkflowExecutionContext nw = newWorkflow(context, t, 
index.getAndIncrement());
+            WorkflowExecutionContext nw = newWorkflow(context, t, wasList ? 
index.getAndIncrement() : null);
             Maybe<Task<Object>> mt = nw.getTask(true);
 
             String targetS = wasList || t !=null ? " for target '"+t+"'" : "";
@@ -298,9 +299,8 @@ public class CustomWorkflowStep extends 
WorkflowStepDefinition implements Workfl
                                     }
                                 }),
                                 task);
-                    } else {
-                        DynamicTasks.queue(task);
                     }
+                    DynamicTasks.queue(task);
                     submitted.add(task);
                 }
             }
@@ -308,6 +308,11 @@ public class CustomWorkflowStep extends 
WorkflowStepDefinition implements Workfl
 
         submitted.forEach(t -> {
             try {
+                if (!t.isSubmitted() && !errors.isEmpty()) {
+                    // if concurrent, all tasks will be submitted, and we 
should wait;
+                    // if not, then there might be queued tasks not yet 
submitted; if there are errors, they will never be submitted
+                    return;
+                }
                 t.get();
             } catch (Throwable tt) {
                 errors.add(tt);
@@ -321,7 +326,7 @@ public class CustomWorkflowStep extends 
WorkflowStepDefinition implements Workfl
         context.setOutput(result);
 
         if (!errors.isEmpty()) {
-            throw Exceptions.propagate("Error running sub-workflows in 
"+context.getWorkflowStepReference(), errors);
+            throw Exceptions.propagate("Error"+(errors.size()>1 ? "s" : "")+" 
running sub-workflow"+(nestedWorkflowContexts.size()>1 ? "s" : "")+" in 
"+context.getWorkflowStepReference(), errors);
         }
 
         return !wasList ? Iterables.getOnlyElement(result) : result;
@@ -352,7 +357,10 @@ public class CustomWorkflowStep extends 
WorkflowStepDefinition implements Workfl
     }
 
     public String getNameOrDefault() {
-        return (Strings.isNonBlank(getName()) ? getName() : 
Strings.isNonBlank(shorthandTypeName) ? shorthandTypeName : "custom step");
+        return getNameOrDefault(() -> Strings.isNonBlank(shorthandTypeName) ? 
shorthandTypeName : "custom step");
+    }
+    public String getNameOrDefault(Supplier<String> defaultSupplier) {
+        return Strings.isNonBlank(getName()) ? getName() : 
defaultSupplier==null ? null : defaultSupplier.get();
     }
 
     @Override
@@ -397,18 +405,25 @@ public class CustomWorkflowStep extends 
WorkflowStepDefinition implements Workfl
         return result;
     }
 
-    private WorkflowExecutionContext 
newWorkflow(WorkflowStepInstanceExecutionContext context, Object target, int 
targetIndex) {
+    private WorkflowExecutionContext 
newWorkflow(WorkflowStepInstanceExecutionContext context, Object target, 
Integer targetIndexOrNull) {
         if (steps==null) throw new IllegalArgumentException("Cannot make new 
workflow with no steps");
 
+        String indexName = targetIndexOrNull==null ? "" : " 
"+(targetIndexOrNull+1);
+        String name = getNameOrDefault(null);
+        name = (name == null ? "Sub-workflow" + indexName : 
"Sub-workflow"+indexName+" for " + name);
+        String targetString = target==null ? null : target.toString();
+        if (targetString!=null && targetString.length()<60 && 
!Strings.isMultiLine(targetString)) name += " ("+targetString+")";
+
         WorkflowExecutionContext nestedWorkflowContext = 
WorkflowExecutionContext.newInstanceUnpersistedWithParent(
                 target instanceof BrooklynObject ? (BrooklynObject) target : 
context.getEntity(), context.getWorkflowExectionContext(),
-                WorkflowExecutionContext.WorkflowContextType.NESTED_WORKFLOW, 
"Workflow for " + getNameOrDefault(),
+                WorkflowExecutionContext.WorkflowContextType.NESTED_WORKFLOW,
+                name,
                 getConfigForSubWorkflow(false), null,
                 ConfigBag.newInstance(getInput()), null);
-        if (target!=null) {
-            nestedWorkflowContext.getWorkflowScratchVariables().put("target", 
target);
-            
nestedWorkflowContext.getWorkflowScratchVariables().put("target_index", 
targetIndex);
-        }
+
+        nestedWorkflowContext.getWorkflowScratchVariables().put("target", 
target);
+        if (targetIndexOrNull!=null) 
nestedWorkflowContext.getWorkflowScratchVariables().put("target_index", 
targetIndexOrNull);
+
         return nestedWorkflowContext;
     }
 
diff --git 
a/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowNestedAndCustomExtensionTest.java
 
b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowNestedAndCustomExtensionTest.java
index 24f387fa1f..289184b37e 100644
--- 
a/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowNestedAndCustomExtensionTest.java
+++ 
b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowNestedAndCustomExtensionTest.java
@@ -22,6 +22,7 @@ import com.google.common.collect.Iterables;
 import org.apache.brooklyn.api.entity.Entity;
 import org.apache.brooklyn.api.entity.EntityLocal;
 import org.apache.brooklyn.api.entity.EntitySpec;
+import org.apache.brooklyn.api.mgmt.HasTaskChildren;
 import org.apache.brooklyn.api.mgmt.Task;
 import org.apache.brooklyn.api.sensor.AttributeSensor;
 import org.apache.brooklyn.api.typereg.RegisteredType;
@@ -90,6 +91,7 @@ public class WorkflowNestedAndCustomExtensionTest extends 
RebindTestFixture<Test
 
     ClassLogWatcher lastLogWatcher;
     TestApplication app;
+    Task<?> lastInvocation;
 
     Object invokeWorkflowStepsWithLogging(List<Object> steps) throws Exception 
{
         return invokeWorkflowStepsWithLogging(steps, null);
@@ -106,8 +108,8 @@ public class WorkflowNestedAndCustomExtensionTest extends 
RebindTestFixture<Test
                     .putAll(extraEffectorConfig));
             eff.apply((EntityLocal)app);
 
-            Task<?> invocation = 
app.invoke(app.getEntityType().getEffectorByName("myWorkflow").get(), null);
-            return invocation.getUnchecked();
+            lastInvocation = 
app.invoke(app.getEntityType().getEffectorByName("myWorkflow").get(), null);
+            return lastInvocation.getUnchecked();
         }
     }
 
@@ -705,4 +707,77 @@ public class WorkflowNestedAndCustomExtensionTest extends 
RebindTestFixture<Test
         }
     }
 
+
+    @Test
+    public void testConcurrentNestedWorkflowsShowAsChildren() throws Exception 
{
+        Object result = invokeWorkflowStepsWithLogging((List<Object>) 
Iterables.getOnlyElement(Yamls.parseAll(Strings.lines(
+                "  - type: workflow",
+                "    target:",
+                "      - A",
+                "      - B",
+                "    concurrency: all",
+                "    steps:",
+                "    - set-sensor s_${target_index} = ${target}",
+                "    - return ${target}",
+                ""
+        ))));
+
+        // ensure it ran
+        Asserts.assertEquals(MutableSet.of("A", "B"), 
MutableSet.copyOf((Iterable)result));
+        EntityAsserts.assertAttributeEquals(app, 
Sensors.newStringSensor("s_0"), "A");
+        EntityAsserts.assertAttributeEquals(app, 
Sensors.newStringSensor("s_1"), "B");
+
+        Task<?> firstStep = Iterables.getOnlyElement( ((HasTaskChildren) 
lastInvocation).getChildren() );
+        Iterable<Task<?>> subworkflows = ((HasTaskChildren) 
firstStep).getChildren();
+        Asserts.assertSize(subworkflows, 2);
+    }
+
+    @Test
+    public void testNestedWorkflowsFail() throws Exception {
+        Asserts.assertFailsWith(() -> 
invokeWorkflowStepsWithLogging((List<Object>) 
Iterables.getOnlyElement(Yamls.parseAll(Strings.lines(
+                "  - type: workflow",
+                "    target:",
+                "      - A",
+                "      - B",
+                "    steps:",
+                "    - fail message deliberate failure on ${target}",
+                ""
+        )))), error -> Asserts.expectedFailureContainsIgnoreCase(error, "error 
running sub-workflows ", "deliberate failure on A"));
+
+        Asserts.assertFailsWith(() -> 
invokeWorkflowStepsWithLogging((List<Object>) 
Iterables.getOnlyElement(Yamls.parseAll(Strings.lines(
+                "  - type: workflow",
+                "    target:",
+                "      - A",
+                "      - B",
+                "    concurrency: all",
+                "    steps:",
+                "    - fail message deliberate failure on ${target}",
+                ""
+        )))), error -> Asserts.expectedFailureContainsIgnoreCase(error, 
"errors running sub-workflows ", "2 errors including", "deliberate failure on 
A"));
+
+        Asserts.assertFailsWith(() -> 
invokeWorkflowStepsWithLogging((List<Object>) 
Iterables.getOnlyElement(Yamls.parseAll(Strings.lines(
+                "  - type: workflow",
+                "    target:",
+                "      - A",
+                "    concurrency: all",
+                "    steps:",
+                "    - fail message deliberate failure on ${target}",
+                ""
+        )))), error -> Asserts.expectedFailureContainsIgnoreCase(error, "error 
running sub-workflow ", "deliberate failure on A"));
+
+        Asserts.assertFailsWith(() -> 
invokeWorkflowStepsWithLogging((List<Object>) 
Iterables.getOnlyElement(Yamls.parseAll(Strings.lines(
+                "  - type: workflow",
+                "    target:",
+                "      - A",
+                "      - B",
+                "      - C",
+                "    steps:",
+                "    - condition:",
+                "        target: ${target}",
+                "        equals: B",
+                "      step: fail message deliberate failure on ${target}",
+                ""
+        )))), error -> Asserts.expectedFailureContainsIgnoreCase(error, "error 
running sub-workflows ", "deliberate failure on B"));
+    }
+
 }

Reply via email to