This is an automated email from the ASF dual-hosted git repository. heneveld pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git
commit b35bab6dcd8f74ea727f9a1ea0fe9da90a260802 Author: Alex Heneveld <[email protected]> AuthorDate: Fri Oct 21 14:55:06 2022 +0100 workflow basic expiry, and more workflow fixes --- .../core/workflow/WorkflowErrorHandling.java | 2 + .../core/workflow/WorkflowExecutionContext.java | 59 ++++++++++++++++++++-- .../core/workflow/WorkflowStepDefinition.java | 3 ++ .../core/workflow/steps/RetryWorkflowStep.java | 6 ++- .../store/WorkflowStatePersistenceViaSensors.java | 29 +++++++++-- .../brooklyn/util/core/task/TaskBuilder.java | 5 +- .../workflow/WorkflowPersistReplayErrorsTest.java | 4 +- 7 files changed, 96 insertions(+), 12 deletions(-) diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowErrorHandling.java b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowErrorHandling.java index f17d25792d..532ab13ee4 100644 --- a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowErrorHandling.java +++ b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowErrorHandling.java @@ -52,6 +52,7 @@ public class WorkflowErrorHandling implements Callable<WorkflowErrorHandling.Wor log.debug("Encountered error in step "+context.getWorkflowStepReference()+" '" + stepTask.getDisplayName() + "' (handler present): " + Exceptions.collapseText(error)); Task<WorkflowErrorHandlingResult> task = Tasks.<WorkflowErrorHandlingResult>builder().dynamic(true).displayName(context.getWorkflowStepReference()+"-error-handler") .tag(BrooklynTaskTags.tagForWorkflowStepErrorHandler(context, null, context.getTaskId())) + .tag(BrooklynTaskTags.WORKFLOW_TAG) .body(new WorkflowErrorHandling(step.getOnError(), context.getWorkflowExectionContext(), context.getWorkflowExectionContext().currentStepIndex, stepTask, error)) .build(); TaskTags.addTagDynamically(stepTask, BrooklynTaskTags.tagForErrorHandledBy(task)); @@ -64,6 +65,7 @@ public class WorkflowErrorHandling implements Callable<WorkflowErrorHandling.Wor log.debug("Encountered error in workflow "+context.getWorkflowId()+"/"+context.getTaskId()+" '" + workflowTask.getDisplayName() + "' (handler present): " + Exceptions.collapseText(error)); Task<WorkflowErrorHandlingResult> task = Tasks.<WorkflowErrorHandlingResult>builder().dynamic(true).displayName(context.getWorkflowId()+"-error-handler") .tag(BrooklynTaskTags.tagForWorkflowStepErrorHandler(context)) + .tag(BrooklynTaskTags.WORKFLOW_TAG) .body(new WorkflowErrorHandling(context.onError, context, null, workflowTask, error)) .build(); TaskTags.addTagDynamically(workflowTask, BrooklynTaskTags.tagForErrorHandledBy(task)); diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java index 76752c672f..2d2eb1688e 100644 --- a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java +++ b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java @@ -20,7 +20,9 @@ package org.apache.brooklyn.core.workflow; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.google.common.collect.Iterables; import com.google.common.reflect.TypeToken; import org.apache.brooklyn.api.entity.Entity; import org.apache.brooklyn.api.mgmt.ManagementContext; @@ -60,6 +62,7 @@ import java.util.*; import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Function; @@ -121,6 +124,9 @@ public class WorkflowExecutionContext { String taskId; transient Task<Object> task; + @JsonProperty("expiryKey") + String expiryKey; + /** all tasks created for this workflow */ Set<WorkflowReplayUtils.WorkflowReplayRecord> replays = MutableSet.of(); transient WorkflowReplayUtils.WorkflowReplayRecord replayCurrent = null; @@ -239,7 +245,7 @@ public class WorkflowExecutionContext { TaskBuilder<Object> tb = Tasks.builder().dynamic(true); if (optionalTaskFlags!=null) tb.flags(optionalTaskFlags); - else tb.displayName(name); + if (Strings.isBlank(tb.getDisplayName())) tb.displayName(name); task = tb.body(new Body()).build(); WorkflowReplayUtils.updateOnWorkflowStartOrReplay(this, task, "initial run", false); workflowId = taskId = task.getId(); @@ -564,6 +570,49 @@ public class WorkflowExecutionContext { return errorHandlerContext; } + @JsonIgnore + public String getExpiryKey() { + if (Strings.isNonBlank(expiryKey)) return expiryKey; + if (Strings.isNonBlank(getName())) return getName(); + return "anonymous-workflow"; + } + + /** look in tasks, steps, and replays to find most recent activity */ + public long getMostRecentActivityTime() { + AtomicLong result = new AtomicLong(-1); + + Consumer<Long> consider = l -> { + if (l!=null && l>result.get()) result.set(l); + }; + Consumer<Task> considerTask = task -> { + if (task!=null) { + consider.accept(task.getEndTimeUtc()); + consider.accept(task.getStartTimeUtc()); + consider.accept(task.getSubmitTimeUtc()); + } + }; + considerTask.accept(getTask(false).orNull()); + + Consumer<WorkflowReplayUtils.WorkflowReplayRecord> considerReplay = replay -> { + if (replay!=null) { + consider.accept(replay.endTimeUtc); + consider.accept(replay.startTimeUtc); + consider.accept(replay.submitTimeUtc); + } + }; + if (replayCurrent!=null) { + considerReplay.accept(replayCurrent); + } else if (!replays.isEmpty()) { + considerReplay.accept(Iterables.getLast(replays)); + } + + if (currentStepInstance!=null) { + considerTask.accept(getManagementContext().getExecutionManager().getTask(currentStepInstance.getTaskId())); + } + + return result.get(); + } + public List<Object> getStepsDefinition() { return MutableList.copyOf(stepsDefinition).asUnmodifiable(); } @@ -771,12 +820,12 @@ public class WorkflowExecutionContext { boolean errorHandled = false; if (isTimeout) { // don't run error handler - log.debug("Timeout in workflow '" + getName() + "' around step " + workflowStepReference(currentStepIndex) + " '" + task.getDisplayName() + "', throwing: " + Exceptions.collapseText(e)); + log.debug("Timeout in workflow '" + getName() + "' around step " + workflowStepReference(currentStepIndex) + ", throwing: " + Exceptions.collapseText(e)); } else if (onError != null && !onError.isEmpty() && provisionalStatus.persistable) { WorkflowErrorHandling.WorkflowErrorHandlingResult result = null; try { - log.debug("Error in workflow '" + getName() + "' around step " + workflowStepReference(currentStepIndex) + " '" + task.getDisplayName() + "', running error handler"); + log.debug("Error in workflow '" + getName() + "' around step " + workflowStepReference(currentStepIndex) + ", running error handler"); Task<WorkflowErrorHandling.WorkflowErrorHandlingResult> workflowErrorHandlerTask = WorkflowErrorHandling.createWorkflowErrorHandlerTask(WorkflowExecutionContext.this, task, e); errorHandlerTaskId = workflowErrorHandlerTask.getId(); result = DynamicTasks.queue(workflowErrorHandlerTask).getUnchecked(); @@ -797,13 +846,13 @@ public class WorkflowExecutionContext { } } catch (Exception e2) { - log.warn("Error in workflow '" + getName() + "' around step " + workflowStepReference(currentStepIndex) + " '" + task.getDisplayName() + "' error handler for -- " + Exceptions.collapseText(e) + " -- threw another error (rethrowing): " + Exceptions.collapseText(e2)); + log.warn("Error in workflow '" + getName() + "' around step " + workflowStepReference(currentStepIndex) + " error handler for -- " + Exceptions.collapseText(e) + " -- threw another error (rethrowing): " + Exceptions.collapseText(e2)); log.debug("Full trace of original error was: " + e, e); e = e2; } } else { - log.debug("Error in workflow '" + getName() + "' around step " + workflowStepReference(currentStepIndex) + " '" + task.getDisplayName() + "', no error handler so rethrowing: " + Exceptions.collapseText(e)); + log.debug("Error in workflow '" + getName() + "' around step " + workflowStepReference(currentStepIndex) + ", no error handler so rethrowing: " + Exceptions.collapseText(e)); } if (!errorHandled) { diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepDefinition.java b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepDefinition.java index 9a2d5b2d05..2026721992 100644 --- a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepDefinition.java +++ b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepDefinition.java @@ -58,6 +58,9 @@ public abstract class WorkflowStepDefinition { public String getName() { return name; } + public void setName(String name) { + this.name = name; + } protected String userSuppliedShorthand; protected String shorthandTypeName; diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/RetryWorkflowStep.java b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/RetryWorkflowStep.java index 62746d2806..444aba31b4 100644 --- a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/RetryWorkflowStep.java +++ b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/RetryWorkflowStep.java @@ -53,6 +53,10 @@ public class RetryWorkflowStep extends WorkflowStepDefinition { public static final ConfigKey<List<RetryLimit>> LIMIT = ConfigKeys.newConfigKey(new TypeToken<List<RetryLimit>>() {}, "limit"); public static final ConfigKey<RetryBackoff> BACKOFF = ConfigKeys.newConfigKey(RetryBackoff.class, "backoff"); + // if multiple retry steps declare the same key, their counts will be combined; used if the same error might be handled in different ways + // note that the limits and backoff _instructions_ apply only at the step where they are defing, so they may need to be defined at each step + public static final ConfigKey<String> KEY = ConfigKeys.newStringConfigKey("key"); + public enum RetryReplayOption { TRUE, FALSE, FORCE } public static class RetryLimit { @@ -188,7 +192,7 @@ public class RetryWorkflowStep extends WorkflowStepDefinition { @Override protected Object doTaskBody(WorkflowStepInstanceExecutionContext context) { - String key = context.getWorkflowExectionContext().getWorkflowStepReference(Tasks.current()); + String key = Strings.firstNonBlank(context.getInput(KEY), context.getWorkflowExectionContext().getWorkflowStepReference(Tasks.current())); List<Instant> retries = context.getWorkflowExectionContext().getRetryRecords().compute(key, (k, v) -> v != null ? v : MutableList.of()); List<RetryLimit> limit = context.getInput(LIMIT); diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStatePersistenceViaSensors.java b/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStatePersistenceViaSensors.java index 6874ea357e..47e4390f17 100644 --- a/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStatePersistenceViaSensors.java +++ b/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStatePersistenceViaSensors.java @@ -26,17 +26,22 @@ import org.apache.brooklyn.api.sensor.AttributeSensor; import org.apache.brooklyn.core.entity.EntityInternal; import org.apache.brooklyn.core.mgmt.BrooklynTaskTags; import org.apache.brooklyn.core.sensor.Sensors; +import org.apache.brooklyn.core.workflow.WorkflowErrorHandling; import org.apache.brooklyn.core.workflow.WorkflowExecutionContext; import org.apache.brooklyn.util.collections.MutableList; import org.apache.brooklyn.util.collections.MutableMap; import org.apache.brooklyn.util.guava.Maybe; +import org.apache.brooklyn.util.text.Strings; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; +import java.util.stream.Collectors; public class WorkflowStatePersistenceViaSensors { + private static final Logger log = LoggerFactory.getLogger(WorkflowStatePersistenceViaSensors.class); + public static final AttributeSensor<Map<String,WorkflowExecutionContext>> INTERNAL_WORKFLOWS = Sensors.newSensor(new TypeToken<Map<String, WorkflowExecutionContext>>() {}, "internals.brooklyn.workflow"); @@ -54,6 +59,24 @@ public class WorkflowStatePersistenceViaSensors { entity.sensors().modify(INTERNAL_WORKFLOWS, v -> { if (v == null) v = MutableMap.of(); v.put(context.getWorkflowId(), context); + String k = Strings.firstNonBlank(context.getExpiryKey(), "empty-expiry-key"); //should always be set + // TODO follow expiry instructions; for now, just keep 3 latest, apart from this one + List<WorkflowExecutionContext> finishedTwins = v.values().stream() + .filter(c -> k.equals(c.getExpiryKey())) + .filter(c -> c.getStatus()!=null && c.getStatus().ended) + .filter(c -> !c.equals(context)) + .collect(Collectors.toList()); + if (finishedTwins.size()>3) { + finishedTwins = MutableList.copyOf(finishedTwins); + Collections.sort(finishedTwins, (t1,t2) -> Long.compare(t2.getMostRecentActivityTime(), t1.getMostRecentActivityTime())); + Iterator<WorkflowExecutionContext> ti = finishedTwins.iterator(); + for (int i=0; i<3; i++) ti.next(); + while (ti.hasNext()) { + WorkflowExecutionContext w = ti.next(); + log.debug("Expiring old workflow "+w+" because it is finished and there are newer ones"); + v.remove(w.getWorkflowId()); + } + } return Maybe.of(v); }); mgmt.getRebindManager().forcePersistNow(false, null); diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/TaskBuilder.java b/core/src/main/java/org/apache/brooklyn/util/core/task/TaskBuilder.java index 79624125d0..069903f728 100644 --- a/core/src/main/java/org/apache/brooklyn/util/core/task/TaskBuilder.java +++ b/core/src/main/java/org/apache/brooklyn/util/core/task/TaskBuilder.java @@ -71,7 +71,10 @@ public class TaskBuilder<T> { this.displayName = displayName; return this; } - + public String getDisplayName() { + return displayName; + } + public TaskBuilder<T> description(String description) { this.description = description; return this; diff --git a/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowPersistReplayErrorsTest.java b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowPersistReplayErrorsTest.java index 049a69235d..b68d8bbb83 100644 --- a/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowPersistReplayErrorsTest.java +++ b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowPersistReplayErrorsTest.java @@ -573,8 +573,8 @@ public class WorkflowPersistReplayErrorsTest extends RebindTestFixture<BasicAppl m -> m.matches("Starting workflow 'Workflow for effector myWorkflow', moving to first step .*-1"), m -> m.matches("Starting step .*-1 in task .*"), m -> m.matches("Error in step '1 - invoke-effector does-not-exist', no error handler so rethrowing: No effector matching 'does-not-exist'"), - m -> m.matches("Error in workflow 'Workflow for effector myWorkflow' around step .*-1 'myWorkflow', running error handler"), - m -> m.matches("Encountered error in workflow .*/.* 'myWorkflow' .handler present.: No effector matching 'does-not-exist'"), + m -> m.matches("Error in workflow 'Workflow for effector myWorkflow' around step .*-1, running error handler"), + m -> m.matches("Encountered error in workflow .*/.* 'Workflow for effector myWorkflow' .handler present.: No effector matching 'does-not-exist'"), m -> m.matches("Creating workflow .* error handler .*-error-handler in task .*"), m -> m.matches("Starting .*-error-handler with 1 handler in task .*"), m -> m.matches("Creating handler .*-error-handler-1 'no-op' in task .*"),
