This is an automated email from the ASF dual-hosted git repository. heneveld pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git
commit 4062c3ab7deb7a06718991e64a95f5e12b735b9f Author: Alex Heneveld <[email protected]> AuthorDate: Fri Oct 21 11:42:04 2022 +0100 misc workflow fixes - more details, including error and rest calls --- .../brooklyn/core/workflow/WorkflowExecutionContext.java | 13 +++++++++++-- .../brooklyn/core/workflow/WorkflowReplayUtils.java | 14 +++++++++++++- .../workflow/WorkflowStepInstanceExecutionContext.java | 3 +++ .../brooklyn/core/workflow/steps/CustomWorkflowStep.java | 14 ++++++++++++-- .../brooklyn/core/workflow/steps/SshWorkflowStep.java | 4 ++-- .../org/apache/brooklyn/util/core/task/TaskBuilder.java | 15 ++++++++++----- .../apache/brooklyn/rest/resources/EntityResource.java | 5 ++++- .../software/base/WorkflowSoftwareProcessSshDriver.java | 2 +- 8 files changed, 56 insertions(+), 14 deletions(-) diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java index 90e32abd10..76752c672f 100644 --- a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java +++ b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java @@ -134,6 +134,10 @@ public class WorkflowExecutionContext { String previousStepTaskId; WorkflowStepInstanceExecutionContext currentStepInstance; + + /** set if an error handler is the last thing which ran */ + String errorHandlerTaskId; + /** set for the last _step_ inside the error handler */ WorkflowStepInstanceExecutionContext errorHandlerContext; Map<Integer, OldStepRecord> oldStepInfo = MutableMap.of(); @@ -773,7 +777,9 @@ public class WorkflowExecutionContext { WorkflowErrorHandling.WorkflowErrorHandlingResult result = null; try { log.debug("Error in workflow '" + getName() + "' around step " + workflowStepReference(currentStepIndex) + " '" + task.getDisplayName() + "', running error handler"); - result = DynamicTasks.queue(WorkflowErrorHandling.createWorkflowErrorHandlerTask(WorkflowExecutionContext.this, task, e)).getUnchecked(); + Task<WorkflowErrorHandling.WorkflowErrorHandlingResult> workflowErrorHandlerTask = WorkflowErrorHandling.createWorkflowErrorHandlerTask(WorkflowExecutionContext.this, task, e); + errorHandlerTaskId = workflowErrorHandlerTask.getId(); + result = DynamicTasks.queue(workflowErrorHandlerTask).getUnchecked(); if (result != null) { errorHandled = true; @@ -899,6 +905,7 @@ public class WorkflowExecutionContext { // and update replayable info WorkflowReplayUtils.updateOnWorkflowStepChange(currentStepRecord, currentStepInstance, step); errorHandlerContext = null; + errorHandlerTaskId = null; persist(); @@ -929,7 +936,9 @@ public class WorkflowExecutionContext { if (!step.onError.isEmpty()) { WorkflowErrorHandling.WorkflowErrorHandlingResult result; try { - result = DynamicTasks.queue(WorkflowErrorHandling.createStepErrorHandlerTask(step, currentStepInstance, t, e)).getUnchecked(); + Task<WorkflowErrorHandling.WorkflowErrorHandlingResult> stepErrorHandlerTask = WorkflowErrorHandling.createStepErrorHandlerTask(step, currentStepInstance, t, e); + currentStepInstance.errorHandlerTaskId = stepErrorHandlerTask.getId(); + result = DynamicTasks.queue(stepErrorHandlerTask).getUnchecked(); if (result!=null) { if (Strings.isNonBlank(result.next)) customNext.set(result.next); saveOutput.accept(result.output); diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowReplayUtils.java b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowReplayUtils.java index 30112ea2c3..d1449cff01 100644 --- a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowReplayUtils.java +++ b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowReplayUtils.java @@ -22,10 +22,12 @@ import com.fasterxml.jackson.annotation.JsonInclude; import com.google.common.collect.Iterables; import org.apache.brooklyn.api.mgmt.Task; import org.apache.brooklyn.util.core.task.DynamicTasks; +import org.apache.brooklyn.util.core.task.Tasks; import org.apache.brooklyn.util.exceptions.Exceptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.Instant; import java.util.List; import java.util.Objects; import java.util.function.Supplier; @@ -75,8 +77,18 @@ public class WorkflowReplayUtils { log.warn("Mismatch in workflow replays for "+ctx+": "+ctx.replayCurrent +" vs "+task); return; } - ctx.replayCurrent.submittedByTaskId = task.getSubmittedByTaskId(); + + // try hard to get submitter data in case tasks go awol before execution + if (task.getSubmittedByTaskId()!=null) { + ctx.replayCurrent.submittedByTaskId = task.getSubmittedByTaskId(); + } else if (ctx.replayCurrent.submittedByTaskId==null && Tasks.current()!=null && !Tasks.current().equals(task)) { + ctx.replayCurrent.submittedByTaskId = Tasks.current().getId(); + } ctx.replayCurrent.submitTimeUtc = task.getSubmitTimeUtc(); + // fake this because we won't see the real value until we also see the start value. + // however we need to ensure any workflow that is created is intended to be run. + if (ctx.replayCurrent.submitTimeUtc<=0) ctx.replayCurrent.submitTimeUtc = Instant.now().toEpochMilli(); + ctx.replayCurrent.startTimeUtc = task.getStartTimeUtc(); ctx.replayCurrent.endTimeUtc = task.getEndTimeUtc(); ctx.replayCurrent.status = task.getStatusSummary(); diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepInstanceExecutionContext.java b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepInstanceExecutionContext.java index b3bc5065f5..d9ea83f774 100644 --- a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepInstanceExecutionContext.java +++ b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepInstanceExecutionContext.java @@ -61,6 +61,9 @@ public class WorkflowStepInstanceExecutionContext { /** set if the step is in an error handler context, containing the error being handled */ Throwable error; + /** set if there was an error handled locally */ + String errorHandlerTaskId; + public String getTaskId() { return taskId; } diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/CustomWorkflowStep.java b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/CustomWorkflowStep.java index 4f36809a5e..c1e313f78c 100644 --- a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/CustomWorkflowStep.java +++ b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/CustomWorkflowStep.java @@ -28,6 +28,7 @@ import com.google.common.reflect.TypeToken; import org.apache.brooklyn.api.entity.Entity; import org.apache.brooklyn.api.mgmt.ManagementContext; import org.apache.brooklyn.api.mgmt.classloading.BrooklynClassLoadingContext; +import org.apache.brooklyn.config.ConfigKey; import org.apache.brooklyn.core.mgmt.BrooklynTaskTags; import org.apache.brooklyn.core.resolve.jackson.BeanWithTypeUtils; import org.apache.brooklyn.core.resolve.jackson.JsonPassThroughDeserializer; @@ -168,14 +169,23 @@ public class CustomWorkflowStep extends WorkflowStepDefinition implements Workfl return result; } + /** Returns a top-level workflow running the workflow defined here */ public WorkflowExecutionContext newWorkflowExecution(Entity entity, String name, ConfigBag extraConfig) { + return newWorkflowExecution(entity, name, extraConfig, null); + } + public WorkflowExecutionContext newWorkflowExecution(Entity entity, String name, ConfigBag extraConfig, Map extraTaskFlags) { return WorkflowExecutionContext.newInstancePersisted(entity, name, ConfigBag.newInstance() .configure(WorkflowCommonConfig.PARAMETER_DEFS, parameters) .configure(WorkflowCommonConfig.STEPS, steps) - .configure(WorkflowCommonConfig.OUTPUT, workflowOutput), + .configure(WorkflowCommonConfig.INPUT, input) + .configure(WorkflowCommonConfig.OUTPUT, workflowOutput) + .configure(WorkflowCommonConfig.REPLAYABLE, replayable) + .configure(WorkflowCommonConfig.ON_ERROR, onError) + .configure(WorkflowCommonConfig.TIMEOUT, timeout) + .configure((ConfigKey) WorkflowCommonConfig.CONDITION, condition), null, - ConfigBag.newInstance(getInput()).putAll(extraConfig), null); + ConfigBag.newInstance(getInput()).putAll(extraConfig), extraTaskFlags); } @VisibleForTesting diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/SshWorkflowStep.java b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/SshWorkflowStep.java index ba03791962..e16b4559b0 100644 --- a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/SshWorkflowStep.java +++ b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/SshWorkflowStep.java @@ -87,7 +87,7 @@ public class SshWorkflowStep extends WorkflowStepDefinition { protected static void checkExitCode(ProcessTaskWrapper<?> ptw, DslPredicates.DslPredicate<Integer> exitcode) { if (exitcode==null) { - if (ptw.getExitCode()!=0) throw new IllegalStateException("Invalid exit code '"+ptw.getExitCode()+"'"); + if (ptw.getExitCode()!=0) throw new IllegalStateException("Invalid exit code "+ptw.getExitCode()); return; } @@ -103,7 +103,7 @@ public class SshWorkflowStep extends WorkflowStepDefinition { // ranges still require `exit-code: { range: [0, 4] }`, same with `exit-code: { less-than: 5 }`. } if (!exitcode.apply(ptw.getExitCode())) { - throw new IllegalStateException("Invalid exit code '"+ptw.getExitCode()+"'; does not match explicit exit-code requirement"); + throw new IllegalStateException("Invalid exit code "+ptw.getExitCode()+"; does not match explicit exit-code requirement"); } } diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/TaskBuilder.java b/core/src/main/java/org/apache/brooklyn/util/core/task/TaskBuilder.java index c6177999bb..79624125d0 100644 --- a/core/src/main/java/org/apache/brooklyn/util/core/task/TaskBuilder.java +++ b/core/src/main/java/org/apache/brooklyn/util/core/task/TaskBuilder.java @@ -18,10 +18,7 @@ */ package org.apache.brooklyn.util.core.task; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.concurrent.Callable; import org.apache.brooklyn.api.mgmt.Task; @@ -34,10 +31,14 @@ import org.apache.brooklyn.util.collections.MutableMap; import org.apache.brooklyn.util.collections.MutableSet; import com.google.common.collect.Iterables; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** Convenience for creating tasks; note that DynamicSequentialTask is the default */ public class TaskBuilder<T> { + private static final Logger log = LoggerFactory.getLogger(TaskBuilder.class); + String displayName = null; String description = null; Callable<T> body = null; @@ -152,7 +153,11 @@ public class TaskBuilder<T> { MutableMap<String, Object> taskFlags = MutableMap.copyOf(flags); if (displayName!=null) taskFlags.put("displayName", displayName); if (description!=null) taskFlags.put("description", description); - if (!tags.isEmpty()) taskFlags.put("tags", tags); + if (!tags.isEmpty()) { + Object otherTags = taskFlags.put("tags", tags); + if (otherTags instanceof Collection) tags.addAll((Collection)otherTags); + else log.warn("Ignoring unexpected 'tags' flag in task: "+otherTags); + } if (Boolean.FALSE.equals(dynamic) && children.isEmpty()) { if (swallowChildrenFailures!=null) diff --git a/rest/rest-resources/src/main/java/org/apache/brooklyn/rest/resources/EntityResource.java b/rest/rest-resources/src/main/java/org/apache/brooklyn/rest/resources/EntityResource.java index 84495ea37b..81cacb01fd 100644 --- a/rest/rest-resources/src/main/java/org/apache/brooklyn/rest/resources/EntityResource.java +++ b/rest/rest-resources/src/main/java/org/apache/brooklyn/rest/resources/EntityResource.java @@ -64,6 +64,7 @@ import org.apache.brooklyn.rest.transform.TaskTransformer; import org.apache.brooklyn.rest.util.EntityRelationUtils; import org.apache.brooklyn.rest.util.WebResourceUtils; import org.apache.brooklyn.util.collections.MutableList; +import org.apache.brooklyn.util.collections.MutableMap; import org.apache.brooklyn.util.core.ClassLoaderUtils; import org.apache.brooklyn.util.core.ResourceUtils; import org.apache.brooklyn.util.core.flags.TypeCoercions; @@ -425,7 +426,9 @@ public class EntityResource extends AbstractBrooklynRestResource implements Enti } WorkflowExecutionContext execution = workflow.newWorkflowExecution(target, - Strings.firstNonBlank(workflow.getName(), workflow.getId(), "API workflow invocation"), null); + Strings.firstNonBlank(workflow.getName(), workflow.getId(), "API workflow invocation"), + null, + MutableMap.of("tags", MutableMap.of("workflow_yaml", yaml))); Task<Object> task = Entities.submit(target, execution.getTask(true).get()); task.blockUntilEnded(timeoutS==null ? Duration.millis(20) : Duration.of(timeoutS)); diff --git a/software/base/src/main/java/org/apache/brooklyn/entity/software/base/WorkflowSoftwareProcessSshDriver.java b/software/base/src/main/java/org/apache/brooklyn/entity/software/base/WorkflowSoftwareProcessSshDriver.java index c8bb664ea2..6a7108eb11 100644 --- a/software/base/src/main/java/org/apache/brooklyn/entity/software/base/WorkflowSoftwareProcessSshDriver.java +++ b/software/base/src/main/java/org/apache/brooklyn/entity/software/base/WorkflowSoftwareProcessSshDriver.java @@ -93,7 +93,7 @@ public class WorkflowSoftwareProcessSshDriver extends AbstractSoftwareProcessSsh WorkflowExecutionContext workflowContext = workflow.newWorkflowExecution(entity, key.getName().toLowerCase(), null /* could getInput from workflow, and merge shell environment here */); - return Maybe.of(DynamicTasks.queue( workflowContext.getTask(true).get() ).getUnchecked()); + return Maybe.of(DynamicTasks.queueIfPossible( workflowContext.getTask(true).get() ).orSubmitAsync(entity).getTask().getUnchecked()); } @Override
