This is an automated email from the ASF dual-hosted git repository. heneveld pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git
commit f7298918ba675766fb2684d12280b61c3e8a36e0 Author: Alex Heneveld <[email protected]> AuthorDate: Thu Apr 13 18:11:52 2023 +0100 fix bug where cancelled workflow would give error releasing a lock --- .../brooklyn/camp/brooklyn/WorkflowYamlTest.java | 26 ++++++++++++++++++++++ .../core/workflow/WorkflowExecutionContext.java | 7 ++++-- .../util/core/task/BasicExecutionContext.java | 9 +++----- .../org/apache/brooklyn/util/javalang/Threads.java | 20 +++++++++++++++++ 4 files changed, 54 insertions(+), 8 deletions(-) diff --git a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/WorkflowYamlTest.java b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/WorkflowYamlTest.java index 51e4a1fe31..a6cf973ad7 100644 --- a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/WorkflowYamlTest.java +++ b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/WorkflowYamlTest.java @@ -734,6 +734,32 @@ public class WorkflowYamlTest extends AbstractYamlTest { Asserts.assertEquals(result, 11); } + @Test + public void testLockReleasedOnCancel() throws Exception { + Entity app = createAndStartApplication( + "services:", + "- type: " + BasicEntity.class.getName()); + Entity entity = Iterables.getOnlyElement(app.getChildren()); + WorkflowExecutionContext x1 = WorkflowBasicTest.runWorkflow(entity, Strings.lines( + "lock: x", + "steps:", + " - set-sensor boolean x1 = true", + " - sleep 5s", + " - return done"), "test"); + EntityAsserts.assertAttributeEqualsEventually(entity, Sensors.newBooleanSensor("x1"), true); + Asserts.assertFalse(x1.getTask(false).get().isDone()); + + WorkflowExecutionContext x2 = WorkflowBasicTest.runWorkflow(entity, Strings.lines( + "lock: x", + "steps:", + " - return done"), "test"); + // x2 will block + Asserts.assertFalse(x2.getTask(false).get().isDone()); + x1.getTask(false).get().cancel(true); + Asserts.assertEquals(x2.getTask(false).get().getUnchecked(), "done"); + Asserts.assertEquals(x2.getTask(false).get().getUnchecked(Duration.seconds(5)), "done"); + } + @Test(groups="Live") public void testContainerEchoBashCommandAsWorkflowEffectorWithVarFromConfig() throws Exception { WorkflowBasicTest.addRegisteredTypeBean(mgmt(), "container", ContainerWorkflowStep.class); diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java index d250900592..220afd3bd9 100644 --- a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java +++ b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java @@ -57,6 +57,7 @@ import org.apache.brooklyn.util.core.text.TemplateProcessor; import org.apache.brooklyn.util.exceptions.Exceptions; import org.apache.brooklyn.util.exceptions.RuntimeInterruptedException; import org.apache.brooklyn.util.guava.Maybe; +import org.apache.brooklyn.util.javalang.Threads; import org.apache.brooklyn.util.text.Strings; import org.apache.brooklyn.util.time.Duration; import org.apache.brooklyn.util.time.Time; @@ -896,8 +897,10 @@ public class WorkflowExecutionContext { if (Entities.isUnmanagingOrNoLongerManaged(getEntity())) { log.debug("Skipping clearance of lock on "+lockSensor.getName()+" in "+WorkflowExecutionContext.this+" because entity unmanaging here; expect auto-replay on resumption to pick up"); } else { - log.debug(WorkflowExecutionContext.this+" releasing lock on "+lockSensor.getName()); - ((EntityInternal.SensorSupportInternal) lockEntity.sensors()).remove(lockSensor); + Threads.runTemporarilyUninterrupted(() -> { + log.debug(WorkflowExecutionContext.this + " releasing lock on " + lockSensor.getName()); + ((EntityInternal.SensorSupportInternal) lockEntity.sensors()).remove(lockSensor); + }); } } } diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java index 5b182edd44..dc5dfad97f 100644 --- a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java +++ b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java @@ -57,6 +57,7 @@ import org.apache.brooklyn.util.core.task.BasicExecutionManager.BrooklynTaskLogg import org.apache.brooklyn.util.core.task.ImmediateSupplier.ImmediateUnsupportedException; import org.apache.brooklyn.util.exceptions.Exceptions; import org.apache.brooklyn.util.guava.Maybe; +import org.apache.brooklyn.util.javalang.Threads; import org.apache.brooklyn.util.time.CountdownTimer; import org.apache.brooklyn.util.time.Duration; import org.slf4j.Logger; @@ -319,14 +320,10 @@ public class BasicExecutionContext extends AbstractExecutionContext { try { return runInSameThread(fakeTaskForContext, new Callable<Maybe<T>>() { - public Maybe<T> call() { - boolean wasAlreadyInterrupted = Thread.interrupted(); + public Maybe<T> call() throws Exception { try { - return job.getImmediately(); + return Threads.runTemporarilyUninterrupted(job::getImmediately); } finally { - if (wasAlreadyInterrupted) { - Thread.currentThread().interrupt(); - } // we've acknowledged that getImmediate may wreck (cancel) the task, // their first priority is to prevent them from leaking; // however previously we did the cancel before running, diff --git a/utils/common/src/main/java/org/apache/brooklyn/util/javalang/Threads.java b/utils/common/src/main/java/org/apache/brooklyn/util/javalang/Threads.java index 9d4625aedc..a7495333b4 100644 --- a/utils/common/src/main/java/org/apache/brooklyn/util/javalang/Threads.java +++ b/utils/common/src/main/java/org/apache/brooklyn/util/javalang/Threads.java @@ -20,6 +20,7 @@ package org.apache.brooklyn.util.javalang; import java.util.ArrayList; import java.util.Collection; +import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.brooklyn.util.exceptions.Exceptions; @@ -105,4 +106,23 @@ public class Threads { return !hooks.isEmpty(); } } + + public static <T> T runTemporarilyUninterrupted(Callable<T> r) throws Exception { + boolean wasAlreadyInterrupted = Thread.interrupted(); + try { + return r.call(); + } finally { + if (wasAlreadyInterrupted) { + Thread.currentThread().interrupt(); + } + } + } + + public static void runTemporarilyUninterrupted(Runnable r) { + try { + runTemporarilyUninterrupted(() -> { r.run(); return null; }); + } catch (Exception e) { + throw Exceptions.propagate(e); + } + } }
