This is an automated email from the ASF dual-hosted git repository. heneveld pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git
commit 85d965153a491789f1367d3265d690f8e479422c Author: Alex Heneveld <[email protected]> AuthorDate: Mon Sep 13 17:08:04 2021 +0100 tidy task before/after model, ensure scheduled tasks clear their task context --- .../util/core/task/BasicExecutionContext.java | 50 +++-- .../util/core/task/BasicExecutionManager.java | 196 ++++++++++------ .../apache/brooklyn/util/core/task/BasicTask.java | 3 +- .../brooklyn/util/core/task/ScheduledTask.java | 2 + .../util/core/task/ScheduledExecutionTest.java | 248 +++++++++++++++------ 5 files changed, 338 insertions(+), 161 deletions(-) diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java index ca9e158..70322ce 100644 --- a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java +++ b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java @@ -361,33 +361,41 @@ public class BasicExecutionContext extends AbstractExecutionContext { taskTags.addAll(tags); - if (Tasks.current()!=null && BrooklynTaskTags.isTransient(Tasks.current()) + if (Tasks.current()!=null && BrooklynTaskTags.isTransient(Tasks.current()) && !taskTags.contains(BrooklynTaskTags.NON_TRANSIENT_TASK_TAG) && !taskTags.contains(BrooklynTaskTags.TRANSIENT_TASK_TAG)) { // tag as transient if submitter is transient, unless explicitly tagged as non-transient taskTags.add(BrooklynTaskTags.TRANSIENT_TASK_TAG); } - final Object startCallback = properties.get("newTaskStartCallback"); - properties.put("newTaskStartCallback", new Function<Task<?>,Void>() { - @Override - public Void apply(Task<?> it) { - registerPerThreadExecutionContext(); - if (startCallback!=null) BasicExecutionManager.invokeCallback(startCallback, it); - return null; - }}); - - final Object endCallback = properties.get("newTaskEndCallback"); - properties.put("newTaskEndCallback", new Function<Task<?>,Void>() { - @Override - public Void apply(Task<?> it) { - try { - if (endCallback!=null) BasicExecutionManager.invokeCallback(endCallback, it); - } finally { - clearPerThreadExecutionContext(); + if (task instanceof ScheduledTask) { + // not run for scheduler + ((ScheduledTask)task).executionContext = this; + + } else { + final Object startCallback = properties.get("newTaskStartCallback"); + properties.put("newTaskStartCallback", new Function<Task<?>, Void>() { + @Override + public Void apply(Task<?> it) { + registerPerThreadExecutionContext(); + if (startCallback != null) BasicExecutionManager.invokeCallback(startCallback, it); + return null; } - return null; - }}); - + }); + + final Object endCallback = properties.get("newTaskEndCallback"); + properties.put("newTaskEndCallback", new Function<Task<?>, Void>() { + @Override + public Void apply(Task<?> it) { + try { + if (endCallback != null) BasicExecutionManager.invokeCallback(endCallback, it); + } finally { + clearPerThreadExecutionContext(); + } + return null; + } + }); + } + if (task instanceof Task) { return executionManager.submit(properties, (Task)task); } else if (task instanceof Callable) { diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java index 29984e2..f1496fc 100644 --- a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java +++ b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java @@ -50,6 +50,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import org.apache.brooklyn.api.entity.Entity; import org.apache.brooklyn.api.internal.BrooklynLoggingCategories; +import org.apache.brooklyn.api.mgmt.ExecutionContext; import org.apache.brooklyn.api.mgmt.ExecutionManager; import org.apache.brooklyn.api.mgmt.HasTaskChildren; import org.apache.brooklyn.api.mgmt.Task; @@ -69,6 +70,7 @@ import org.apache.brooklyn.util.core.task.TaskInternal.TaskCancellationMode; import org.apache.brooklyn.util.exceptions.Exceptions; import org.apache.brooklyn.util.exceptions.RuntimeInterruptedException; import org.apache.brooklyn.util.guava.Maybe; +import org.apache.brooklyn.util.javalang.JavaClassNames; import org.apache.brooklyn.util.text.Identifiers; import org.apache.brooklyn.util.text.Strings; import org.apache.brooklyn.util.time.CountdownTimer; @@ -106,6 +108,8 @@ public class BasicExecutionManager implements ExecutionManager { public static final String LOGGING_MDC_KEY_ENTITY_IDS = "entity.ids"; public static final String LOGGING_MDC_KEY_TASK_ID = "task.id"; + private static final boolean SCHEDULED_TASKS_COUNT_AS_ACTIVE = false; + private boolean jitterThreads = BrooklynFeatureEnablement.isEnabled(BrooklynFeatureEnablement.FEATURE_JITTER_THREADS); private int jitterThreadsMaxDelay = Integer.getInteger(JITTER_THREADS_MAX_DELAY_PROPERTY, 200); @@ -525,10 +529,13 @@ public class BasicExecutionManager implements ExecutionManager { } protected Task<?> submitNewScheduledTask(final Map<?,?> flags, final ScheduledTask task) { - beforeSubmitScheduledTaskAllIterations(flags, task); - - if (!submitSubsequentScheduledTask(flags, task)) { - afterEndScheduledTaskAllIterations(flags, task, null); + boolean result = false; + try { + result = submitSubsequentScheduledTask(flags, task); + } finally { + if (!result) { + afterEndScheduledTaskAllIterations(flags, task, null); + } } return task; } @@ -555,27 +562,39 @@ public class BasicExecutionManager implements ExecutionManager { @Override @SuppressWarnings({ "rawtypes", "unchecked" }) public Object call() { - if (task.startTimeUtc==-1) { - // this is overwritten on each run; not sure if that's best or not - task.startTimeUtc = System.currentTimeMillis(); - } - TaskInternal<?> taskScheduled = null; + TaskInternal<?> taskIteration = null; Throwable error = null; try { - taskScheduled = (TaskInternal<?>) task.newTask(); - taskScheduled.setSubmittedByTask(task); - beforeStartScheduledTaskSubmissionIteration(flags, task, taskScheduled); - final Callable<?> oldJob = taskScheduled.getJob(); - final TaskInternal<?> taskScheduledF = taskScheduled; - taskScheduled.setJob(new Callable() { @Override public Object call() { + if (task.startTimeUtc==-1) { + beforeSubmitScheduledTaskAllIterations(flags, task); + beforeStartScheduledTaskAllIterations(flags, task); + + task.startTimeUtc = System.currentTimeMillis(); + } + + taskIteration = (TaskInternal<?>) task.newTask(); + taskIteration.setSubmittedByTask(task); + + beforeSubmitScheduledTaskSubmissionIteration(flags, task); + + final Callable<?> oldJob = taskIteration.getJob(); + final TaskInternal<?> taskIterationF = taskIteration; + taskIteration.setJob(new Callable() { @Override public Object call() { if (task.isCancelled()) { - afterEndScheduledTaskAllIterations(flags, task, new CancellationException("cancel detected")); - throw new CancellationException("cancel detected"); // above throws, but for good measure + CancellationException cancelDetected = new CancellationException("cancel detected"); + try { + afterEndScheduledTaskSubmissionIteration(flags, task, taskIterationF, cancelDetected); + } finally { + // do in finally block so runs even if above throws cancelDetected + afterEndScheduledTaskAllIterations(flags, task, cancelDetected); + } + throw cancelDetected; } Throwable lastError = null; boolean shouldResubmit = true; - task.recentRun = taskScheduledF; - try (BrooklynTaskLoggingMdc mdc = BrooklynTaskLoggingMdc.create(task).start()) { + task.recentRun = taskIterationF; + try (BrooklynTaskLoggingMdc mdc = BrooklynTaskLoggingMdc.create(taskIterationF).start()) { + beforeStartScheduledTaskSubmissionIteration(flags, task, taskIterationF); synchronized (task) { task.notifyAll(); } @@ -590,26 +609,33 @@ public class BasicExecutionManager implements ExecutionManager { } return result; } finally { - // do in finally block in case we were interrupted - if (shouldResubmit && resubmit()) { - // resubmitted fine, no-op - } else { - // not resubmitted, note ending - afterEndScheduledTaskAllIterations(flags, task, lastError); + if (!task.isCancelled() || task.getEndTimeUtc()<=0) { + // don't re-run on cancellation + + afterEndScheduledTaskSubmissionIteration(flags, task, taskIterationF, lastError); + // do in finally block in case we were interrupted + if (shouldResubmit && resubmit()) { + // resubmitted fine, no-op + } else { + // not resubmitted, note ending + afterEndScheduledTaskAllIterations(flags, task, lastError); + } } } }}); - task.nextRun = taskScheduled; - BasicExecutionContext ec = BasicExecutionContext.getCurrentExecutionContext(); - if (ec!=null) return ec.submit(taskScheduled); - else return submit(taskScheduled); + task.nextRun = taskIteration; + ExecutionContext ec = + // no longer associated the execution context on each execution; +// BasicExecutionContext.getCurrentExecutionContext(); + // instead it is set on the task + task.executionContext; + if (ec!=null) return ec.submit(taskIteration); + else return submit(taskIteration); } catch (Exception e) { error = e; + afterEndScheduledTaskSubmissionIteration(flags, task, taskIteration, error); throw Exceptions.propagate(e); - - } finally { - afterEndScheduledTaskSubmissionIteration(flags, task, taskScheduled, error); } } @@ -804,8 +830,9 @@ public class BasicExecutionManager implements ExecutionManager { } } - if (task instanceof ScheduledTask) - return (Task<T>) submitNewScheduledTask(flags, (ScheduledTask)task); + if (task instanceof ScheduledTask) { + return (Task<T>) submitNewScheduledTask(flags, (ScheduledTask) task); + } beforeSubmitAtomicTask(flags, task); @@ -850,6 +877,11 @@ public class BasicExecutionManager implements ExecutionManager { } protected void beforeSubmitScheduledTaskAllIterations(Map<?,?> flags, Task<?> task) { + // for these, beforeSubmitAtomicTask is not called, + // but beforeStartAtomic and afterSubmitAtomic _are_ called + internalBeforeSubmit(flags, task); + } + protected void beforeSubmitScheduledTaskSubmissionIteration(Map<?,?> flags, Task<?> task) { internalBeforeSubmit(flags, task); } protected void beforeSubmitAtomicTask(Map<?,?> flags, Task<?> task) { @@ -916,20 +948,25 @@ public class BasicExecutionManager implements ExecutionManager { } } - protected void beforeStartScheduledTaskSubmissionIteration(Map<?,?> flags, Task<?> taskRepeatedlyScheduling, Task<?> taskIteration) { - internalBeforeStart(flags, taskRepeatedlyScheduling, true); + /** normally (if not interrupted) called once for each call to {@link #beforeSubmitScheduledTaskAllIterations(Map, Task)} */ + protected void beforeStartScheduledTaskAllIterations(Map<?,?> flags, Task<?> taskDoingTheInitialSchedule) { + internalBeforeStart(flags, taskDoingTheInitialSchedule, !SCHEDULED_TASKS_COUNT_AS_ACTIVE, true, true); + } + protected void beforeStartScheduledTaskSubmissionIteration(Map<?,?> flags, Task<?> taskDoingTheScheduling, Task<?> taskIteration) { + // no-op, because handled as an atomic task + // internalBeforeStart(flags, taskIteration, true, false); } protected void beforeStartAtomicTask(Map<?,?> flags, Task<?> task) { - internalBeforeStart(flags, task, true); + internalBeforeStart(flags, task, false, true, false); } protected void beforeStartInSameThreadTask(Map<?,?> flags, Task<?> task) { - internalBeforeStart(flags, task, false); + internalBeforeStart(flags, task, false, false, false); } /** invoked in a task's thread when a task is starting to run (may be some time after submitted), * but before doing any of the task's work, so that we can update bookkeeping and notify callbacks */ - protected void internalBeforeStart(Map<?,?> flags, Task<?> task, boolean allowJitter) { - int count = activeTaskCount.incrementAndGet(); + protected void internalBeforeStart(Map<?,?> flags, Task<?> task, boolean skipIncrementCounter, boolean allowJitter, boolean startingThisThreadMightEndElsewhere) { + int count = skipIncrementCounter ? activeTaskCount.get() : activeTaskCount.incrementAndGet(); if (count % 1000==0) { log.warn("High number of active tasks: task #"+count+" is "+task); } @@ -939,19 +976,21 @@ public class BasicExecutionManager implements ExecutionManager { if (!task.isCancelled()) { Thread thread = Thread.currentThread(); ((TaskInternal<?>)task).setThread(thread); - if (RENAME_THREADS) { - threadOriginalName.set(thread.getName()); - String newThreadName = "brooklyn-" + CaseFormat.LOWER_HYPHEN.to(CaseFormat.LOWER_CAMEL, task.getDisplayName().replace(" ", "")) + "-" + task.getId().substring(0, 8); - thread.setName(newThreadName); + if (!startingThisThreadMightEndElsewhere) { + if (RENAME_THREADS) { + threadOriginalName.set(thread.getName()); + String newThreadName = "brooklyn-" + CaseFormat.LOWER_HYPHEN.to(CaseFormat.LOWER_CAMEL, task.getDisplayName().replace(" ", "")) + "-" + task.getId().substring(0, 8); + thread.setName(newThreadName); + } + PerThreadCurrentTaskHolder.perThreadCurrentTask.set(task); } - PerThreadCurrentTaskHolder.perThreadCurrentTask.set(task); ((TaskInternal<?>)task).setStartTimeUtc(System.currentTimeMillis()); } if (allowJitter) { jitterThreadStart(task); } - if (flags!=null) { + if (flags!=null && !startingThisThreadMightEndElsewhere) { invokeCallback(flags.get("newTaskStartCallback"), task); } } @@ -992,23 +1031,33 @@ public class BasicExecutionManager implements ExecutionManager { } private static boolean loggedClosureDeprecatedInInvokeCallback; - /** normally (if not interrupted) called once for each call to {@link #beforeSubmitScheduledTaskAllIterations(Map, Task)} */ - protected void afterEndScheduledTaskAllIterations(Map<?,?> flags, Task<?> taskRepeatedlyScheduling, Throwable error) { - internalAfterEnd(flags, taskRepeatedlyScheduling, false, true, error); + /** normally (if not interrupted) called once for each call to {@link #beforeStartScheduledTaskAllIterations(Map, Task)} */ + protected void afterEndScheduledTaskAllIterations(Map<?,?> flags, Task<?> taskDoingTheInitialSchedule, Throwable error) { + boolean taskWasSubmittedAndNotYetEnded = true; + try { + taskWasSubmittedAndNotYetEnded = internalAfterEnd(flags, taskDoingTheInitialSchedule, !SCHEDULED_TASKS_COUNT_AS_ACTIVE, false, error); + } finally { + synchronized (taskDoingTheInitialSchedule) { taskDoingTheInitialSchedule.notifyAll(); } + if (taskWasSubmittedAndNotYetEnded) { + // prevent from running twice on cancellation after start + ((TaskInternal<?>) taskDoingTheInitialSchedule).runListeners(); + } + } } /** called once for each call to {@link #beforeStartScheduledTaskSubmissionIteration(Map, Task, Task)}, * with a per-iteration task generated by the surrounding scheduled task */ - protected void afterEndScheduledTaskSubmissionIteration(Map<?,?> flags, Task<?> taskRepeatedlyScheduling, Task<?> taskIteration, Throwable error) { - internalAfterEnd(flags, taskRepeatedlyScheduling, true, false, error); + protected void afterEndScheduledTaskSubmissionIteration(Map<?,?> flags, Task<?> taskDoingTheInitialSchedule, Task<?> taskIteration, Throwable error) { + // no-op because handled as an atomic task + // internalAfterEnd(flags, taskIteration, false, true, error); } /** called once for each task on which {@link #beforeStartAtomicTask(Map, Task)} is invoked, * and normally (if not interrupted prior to start) * called once for each task on which {@link #beforeSubmitAtomicTask(Map, Task)} */ protected void afterEndAtomicTask(Map<?,?> flags, Task<?> task, Throwable error) { - internalAfterEnd(flags, task, true, true, error); + internalAfterEnd(flags, task, false, true, error); } protected void afterEndInSameThreadTask(Map<?,?> flags, Task<?> task, Throwable error) { - internalAfterEnd(flags, task, true, true, error); + internalAfterEnd(flags, task, false, true, error); } protected void afterEndForCancelBeforeStart(Map<?,?> flags, Task<?> task, boolean calledFromCanceller) { if (calledFromCanceller) { @@ -1025,28 +1074,37 @@ public class BasicExecutionManager implements ExecutionManager { // to ensure listeners and callback only invoked once } } - internalAfterEnd(flags, task, !calledFromCanceller, true, null); + internalAfterEnd(flags, task, true, !calledFromCanceller, null); } /** normally (if not interrupted) called once for each call to {@link #internalBeforeSubmit(Map, Task)}, * and, for atomic tasks and scheduled-task submission iterations where - * always called once if {@link #internalBeforeStart(Map, Task, boolean)} is invoked and in the same thread as that method */ - protected void internalAfterEnd(Map<?,?> flags, Task<?> task, boolean startedInThisThread, boolean isEndingAllIterations, Throwable error) { + * always called once if {@link #internalBeforeStart(Map, Task, boolean, boolean, boolean)} is invoked and if possible + * (but not possible for isEndingAllIterations) in the same thread as that method */ + protected boolean internalAfterEnd(Map<?,?> flags, Task<?> task, boolean skipDecrementCounter, boolean startedGuaranteedToEndInSameThreadAndEndingSameThread, Throwable error) { boolean taskWasSubmittedAndNotYetEnded = true; try { if (log.isTraceEnabled()) log.trace(this+" afterEnd, task: "+task); - if (startedInThisThread) { + taskWasSubmittedAndNotYetEnded = incompleteTaskIds.remove(task.getId()); + // this method might be called more than once, eg if cancelled, so use the above as a guard where single invocation is needed (eg counts) + + if (!skipDecrementCounter && taskWasSubmittedAndNotYetEnded) { activeTaskCount.decrementAndGet(); } - if (isEndingAllIterations) { - taskWasSubmittedAndNotYetEnded = incompleteTaskIds.remove(task.getId()); - if (flags!=null && taskWasSubmittedAndNotYetEnded) { - invokeCallback(flags.get("newTaskEndCallback"), task); + + if (flags!=null && taskWasSubmittedAndNotYetEnded && startedGuaranteedToEndInSameThreadAndEndingSameThread) { + invokeCallback(flags.get("newTaskEndCallback"), task); + } + if (task.getEndTimeUtc()>0) { + if (taskWasSubmittedAndNotYetEnded) { + // shouldn't happen + log.debug("Task "+task+" has end time "+task.getEndTimeUtc()+" but was marked as incomplete"); } - ((TaskInternal<?>)task).setEndTimeUtc(System.currentTimeMillis()); + } else { + ((TaskInternal<?>) task).setEndTimeUtc(System.currentTimeMillis()); } - - if (startedInThisThread) { + + if (startedGuaranteedToEndInSameThreadAndEndingSameThread) { PerThreadCurrentTaskHolder.perThreadCurrentTask.remove(); //clear thread _after_ endTime set, so we won't get a null thread when there is no end-time if (RENAME_THREADS) { @@ -1058,15 +1116,18 @@ public class BasicExecutionManager implements ExecutionManager { threadOriginalName.remove(); } } - ((TaskInternal<?>)task).setThread(null); } + ((TaskInternal<?>)task).setThread(null); + } finally { try { if (error!=null) { /* we throw, after logging debug. * the throw means the error is available for task submitters to monitor. * however it is possible no one is monitoring it, in which case we will have debug logging only for errors. - * (the alternative, of warn-level logging in lots of places where we don't want it, seems worse!) + * (the alternative, of warn-level logging in lots of places where we don't want it, seems worse!) + * + * Note in particular that scheduled tasks will typically swallow this and simply re-submit */ if (log.isDebugEnabled()) { // debug only here, because most submitters will handle failures @@ -1088,12 +1149,13 @@ public class BasicExecutionManager implements ExecutionManager { } } finally { synchronized (task) { task.notifyAll(); } - if (isEndingAllIterations && taskWasSubmittedAndNotYetEnded) { + if (taskWasSubmittedAndNotYetEnded) { // prevent from running twice on cancellation after start ((TaskInternal<?>)task).runListeners(); } } } + return taskWasSubmittedAndNotYetEnded; } public TaskScheduler getTaskSchedulerForTag(Object tag) { diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicTask.java b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicTask.java index b5304e1..a6fe8d0 100644 --- a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicTask.java +++ b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicTask.java @@ -553,8 +553,7 @@ public class BasicTask<T> implements TaskInternal<T> { else if (!isCancelled() && startTimeUtc <= 0) { rv = "Submitted for execution"; if (verbosity>0) { - long elapsed = System.currentTimeMillis() - submitTimeUtc; - rv += " "+Time.makeTimeStringRoundedSince(elapsed)+" ago"; + rv += " "+Time.makeTimeStringRoundedSince(submitTimeUtc)+" ago"; } if (verbosity >= 2 && getExtraStatusText()!=null) { rv += "\n\n"+getExtraStatusText(); diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/ScheduledTask.java b/core/src/main/java/org/apache/brooklyn/util/core/task/ScheduledTask.java index beabe30..8851971 100644 --- a/core/src/main/java/org/apache/brooklyn/util/core/task/ScheduledTask.java +++ b/core/src/main/java/org/apache/brooklyn/util/core/task/ScheduledTask.java @@ -18,6 +18,7 @@ */ package org.apache.brooklyn.util.core.task; +import org.apache.brooklyn.api.mgmt.ExecutionContext; import static org.apache.brooklyn.util.JavaGroovyEquivalents.elvis; import static org.apache.brooklyn.util.JavaGroovyEquivalents.groovyTruth; @@ -73,6 +74,7 @@ public class ScheduledTask extends BasicTask<Object> { */ protected boolean cancelOnException = true; + protected ExecutionContext executionContext; protected int runCount=0; protected Task<?> recentRun, nextRun; Class<? extends Exception> lastThrownType; diff --git a/core/src/test/java/org/apache/brooklyn/util/core/task/ScheduledExecutionTest.java b/core/src/test/java/org/apache/brooklyn/util/core/task/ScheduledExecutionTest.java index 166da59..fea7234 100644 --- a/core/src/test/java/org/apache/brooklyn/util/core/task/ScheduledExecutionTest.java +++ b/core/src/test/java/org/apache/brooklyn/util/core/task/ScheduledExecutionTest.java @@ -18,6 +18,10 @@ */ package org.apache.brooklyn.util.core.task; +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.core.mgmt.BrooklynTaskTags; +import org.apache.brooklyn.core.test.entity.TestEntityImpl; +import org.apache.brooklyn.util.collections.MutableList; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; @@ -49,36 +53,37 @@ import com.google.common.collect.Lists; public class ScheduledExecutionTest { public static final Logger log = LoggerFactory.getLogger(ScheduledExecutionTest.class); - + @Test public void testScheduledTask() throws Exception { Duration PERIOD = Duration.millis(20); BasicExecutionManager m = new BasicExecutionManager("mycontextid"); final AtomicInteger i = new AtomicInteger(0); ScheduledTask t = ScheduledTask.builder(() -> new BasicTask<Integer>(() -> { - log.info("task running: "+Tasks.current()+" "+Tasks.current().getStatusDetail(false)); - return i.incrementAndGet(); - })) + log.info("task running: " + Tasks.current() + " " + Tasks.current().getStatusDetail(false)); + return i.incrementAndGet(); + })) .displayName("test-1") .delay(PERIOD.multiply(2)) .period(PERIOD) .maxIterations(5) .build(); - + log.info("submitting {} {}", t, t.getStatusDetail(false)); m.submit(t); log.info("submitted {} {}", t, t.getStatusDetail(false)); Integer interimResult = (Integer) t.get(); - log.info("done one ({}) {} {}", new Object[] {interimResult, t, t.getStatusDetail(false)}); - assertTrue(i.get() > 0, "i="+i); + log.info("done one ({}) {} {}", new Object[]{interimResult, t, t.getStatusDetail(false)}); + assertTrue(i.get() > 0, "i=" + i); t.blockUntilEnded(); Integer finalResult = (Integer) t.get(); - log.info("ended ({}) {} {}", new Object[] {finalResult, t, t.getStatusDetail(false)}); - assertEquals(finalResult, (Integer)5); + log.info("ended ({}) {} {}", new Object[]{finalResult, t, t.getStatusDetail(false)}); + assertEquals(finalResult, (Integer) 5); assertEquals(i.get(), 5); + Asserts.eventually(m::getNumActiveTasks, l -> l==0); } - @Test + @Test(groups="Integration") public void testScheduledTaskCancelledIfExceptionThrown() throws Exception { BasicExecutionManager m = new BasicExecutionManager("mycontextid"); final AtomicInteger calls = new AtomicInteger(0); @@ -90,12 +95,15 @@ public class ScheduledExecutionTest { public Integer call() { calls.incrementAndGet(); throw new RuntimeException("boo"); - }}); - }}); + } + }); + } + }); m.submit(t); Runnable callsIsOne = new Runnable() { - @Override public void run() { + @Override + public void run() { if (calls.get() != 1) { throw new RuntimeException("not yet"); } @@ -104,6 +112,7 @@ public class ScheduledExecutionTest { }; Asserts.succeedsEventually(callsIsOne); Asserts.succeedsContinually(callsIsOne); + Asserts.eventually(m::getNumActiveTasks, l -> l==0); } @Test @@ -118,44 +127,52 @@ public class ScheduledExecutionTest { public Integer call() { calls.incrementAndGet(); throw new RuntimeException("boo"); - }}); - }}); + } + }); + } + }); m.submit(t); t.blockUntilEnded(); assertEquals(calls.get(), 5, "Expected task to be resubmitted despite throwing an exception"); + Asserts.eventually(m::getNumActiveTasks, l -> l==0); } - /** like testScheduledTask but the loop is terminated by the task itself adjusting the period */ + /** + * like testScheduledTask but the loop is terminated by the task itself adjusting the period + */ @Test public void testScheduledTaskSelfEnding() throws Exception { int PERIOD = 20; BasicExecutionManager m = new BasicExecutionManager("mycontextid"); final AtomicInteger i = new AtomicInteger(0); - ScheduledTask t = new ScheduledTask(MutableMap.of("delay", 2*PERIOD, "period", PERIOD), new Callable<Task<?>>() { + ScheduledTask t = new ScheduledTask(MutableMap.of("delay", 2 * PERIOD, "period", PERIOD), new Callable<Task<?>>() { @Override public Task<?> call() throws Exception { return new BasicTask<Integer>(new Callable<Integer>() { @Override public Integer call() { - ScheduledTask submitter = (ScheduledTask) ((BasicTask)Tasks.current()).getSubmittedByTask(); + ScheduledTask submitter = (ScheduledTask) ((BasicTask) Tasks.current()).getSubmittedByTask(); if (i.get() >= 4) submitter.period = null; - log.info("task running ("+i+"): "+Tasks.current()+" "+Tasks.current().getStatusDetail(false)); + log.info("task running (" + i + "): " + Tasks.current() + " " + Tasks.current().getStatusDetail(false)); return i.incrementAndGet(); - }}); - }}); - + } + }); + } + }); + log.info("submitting {} {}", t, t.getStatusDetail(false)); m.submit(t); log.info("submitted {} {}", t, t.getStatusDetail(false)); Integer interimResult = (Integer) t.get(); - log.info("done one ({}) {} {}", new Object[] {interimResult, t, t.getStatusDetail(false)}); + log.info("done one ({}) {} {}", new Object[]{interimResult, t, t.getStatusDetail(false)}); assertTrue(i.get() > 0); t.blockUntilEnded(); Integer finalResult = (Integer) t.get(); - log.info("ended ({}) {} {}", new Object[] {finalResult, t, t.getStatusDetail(false)}); - assertEquals(finalResult, (Integer)5); + log.info("ended ({}) {} {}", new Object[]{finalResult, t, t.getStatusDetail(false)}); + assertEquals(finalResult, (Integer) 5); assertEquals(i.get(), 5); + Asserts.eventually(m::getNumActiveTasks, l -> l==0); } @Test @@ -169,35 +186,38 @@ public class ScheduledExecutionTest { return new BasicTask<Integer>(new Callable<Integer>() { @Override public Integer call() { - log.info("task running ("+i+"): "+Tasks.current()+" "+Tasks.current().getStatusDetail(false)); - ScheduledTask submitter = (ScheduledTask) ((BasicTask)Tasks.current()).getSubmittedByTask(); + log.info("task running (" + i + "): " + Tasks.current() + " " + Tasks.current().getStatusDetail(false)); + ScheduledTask submitter = (ScheduledTask) ((BasicTask) Tasks.current()).getSubmittedByTask(); i.incrementAndGet(); if (i.get() >= 5) submitter.cancel(); return i.get(); - }}); - }}); - - log.info(JavaClassNames.niceClassAndMethod()+" - submitting {} {}", t, t.getStatusDetail(false)); + } + }); + } + }); + + log.info(JavaClassNames.niceClassAndMethod() + " - submitting {} {}", t, t.getStatusDetail(false)); m.submit(t); log.info("submitted {} {}", t, t.getStatusDetail(false)); Integer interimResult = (Integer) t.get(); - log.info("done one ({}) {} {}", new Object[] {interimResult, t, t.getStatusDetail(false)}); + log.info("done one ({}) {} {}", new Object[]{interimResult, t, t.getStatusDetail(false)}); assertTrue(i.get() > 0); t.blockUntilEnded(); // int finalResult = t.get() - log.info("ended ({}) {} {}", new Object[] {i, t, t.getStatusDetail(false)}); + log.info("ended ({}) {} {}", new Object[]{i, t, t.getStatusDetail(false)}); // assertEquals(finalResult, 5) assertEquals(i.get(), 5); + Asserts.eventually(m::getNumActiveTasks, l -> l==0); } - @Test(groups="Integration") + @Test(groups = "Integration") public void testScheduledTaskCancelOuter() throws Exception { final Duration PERIOD = Duration.millis(20); final Duration CYCLE_DELAY = Duration.ONE_SECOND; // this should be enough to start the next cycle, but not so much that the cycle ends; // and enough that when a task is interrupted it terminates within this period final Duration SMALL_FRACTION_OF_CYCLE_DELAY = PERIOD.add(CYCLE_DELAY.multiply(0.1)); - + BasicExecutionManager m = new BasicExecutionManager("mycontextid"); final AtomicInteger i = new AtomicInteger(); ScheduledTask t = new ScheduledTask(MutableMap.of("delay", PERIOD.times(2), "period", PERIOD), new Callable<Task<?>>() { @@ -206,48 +226,51 @@ public class ScheduledExecutionTest { return new BasicTask<Integer>(new Callable<Integer>() { @Override public Integer call() { - log.info("task running ("+i+"): "+Tasks.current()+" "+Tasks.current().getStatusDetail(false)); + log.info("task running (" + i + "): " + Tasks.current() + " " + Tasks.current().getStatusDetail(false)); Time.sleep(CYCLE_DELAY); i.incrementAndGet(); return i.get(); - }}); - }}); - - log.info(JavaClassNames.niceClassAndMethod()+" - submitting {} {}", t, t.getStatusDetail(false)); + } + }); + } + }); + + log.info(JavaClassNames.niceClassAndMethod() + " - submitting {} {}", t, t.getStatusDetail(false)); m.submit(t); log.info("submitted {} {}", t, t.getStatusDetail(false)); Integer interimResult = (Integer) t.get(); - log.info("done one ({}) {} {}", new Object[] {interimResult, t, t.getStatusDetail(false)}); + log.info("done one ({}) {} {}", new Object[]{interimResult, t, t.getStatusDetail(false)}); assertEquals(i.get(), 1); - + Time.sleep(SMALL_FRACTION_OF_CYCLE_DELAY); assertEquals(t.get(), 2); - + Time.sleep(SMALL_FRACTION_OF_CYCLE_DELAY); Stopwatch timer = Stopwatch.createUnstarted(); t.cancel(true); t.blockUntilEnded(); // int finalResult = t.get() - log.info("blocked until ended ({}) {} {}, in {}", new Object[] {i, t, t.getStatusDetail(false), Duration.of(timer)}); + log.info("blocked until ended ({}) {} {}, in {}", new Object[]{i, t, t.getStatusDetail(false), Duration.of(timer)}); try { t.get(); - Assert.fail("Should have failed getting result of cancelled "+t); + Assert.fail("Should have failed getting result of cancelled " + t); } catch (Exception e) { /* expected */ } assertEquals(i.get(), 2); - log.info("ended ({}) {} {}, in {}", new Object[] {i, t, t.getStatusDetail(false), Duration.of(timer)}); + log.info("ended ({}) {} {}, in {}", new Object[]{i, t, t.getStatusDetail(false), Duration.of(timer)}); Assert.assertTrue(Duration.of(timer).isShorterThan(SMALL_FRACTION_OF_CYCLE_DELAY)); + Asserts.eventually(m::getNumActiveTasks, l -> l==0); } - @Test(groups="Integration") + @Test(groups = "Integration") public void testScheduledTaskCancelInterrupts() throws Exception { final Duration PERIOD = Duration.millis(20); final Duration CYCLE_DELAY = Duration.ONE_SECOND; // this should be enough to start the next cycle, but not so much that the cycle ends; // and enough that when a task is interrupted it terminates within this period final Duration SMALL_FRACTION_OF_CYCLE_DELAY = PERIOD.add(CYCLE_DELAY.multiply(0.1)); - + BasicExecutionManager m = new BasicExecutionManager("mycontextid"); final Semaphore interruptedSemaphore = new Semaphore(0); final AtomicInteger i = new AtomicInteger(); @@ -258,7 +281,7 @@ public class ScheduledExecutionTest { @Override public Integer call() { try { - log.info("task running ("+i+"): "+Tasks.current()+" "+Tasks.current().getStatusDetail(false)); + log.info("task running (" + i + "): " + Tasks.current() + " " + Tasks.current().getStatusDetail(false)); Time.sleep(CYCLE_DELAY); i.incrementAndGet(); return i.get(); @@ -266,45 +289,48 @@ public class ScheduledExecutionTest { interruptedSemaphore.release(); throw Exceptions.propagate(e); } - }}); - }}); - - log.info(JavaClassNames.niceClassAndMethod()+" - submitting {} {}", t, t.getStatusDetail(false)); + } + }); + } + }); + + log.info(JavaClassNames.niceClassAndMethod() + " - submitting {} {}", t, t.getStatusDetail(false)); m.submit(t); log.info("submitted {} {}", t, t.getStatusDetail(false)); Integer interimResult = (Integer) t.get(); - log.info("done one ({}) {} {}", new Object[] {interimResult, t, t.getStatusDetail(false)}); + log.info("done one ({}) {} {}", new Object[]{interimResult, t, t.getStatusDetail(false)}); assertEquals(i.get(), 1); - + Time.sleep(SMALL_FRACTION_OF_CYCLE_DELAY); assertEquals(t.get(), 2); - + Time.sleep(SMALL_FRACTION_OF_CYCLE_DELAY); Stopwatch timer = Stopwatch.createUnstarted(); t.cancel(true); t.blockUntilEnded(); // int finalResult = t.get() - log.info("blocked until ended ({}) {} {}, in {}", new Object[] {i, t, t.getStatusDetail(false), Duration.of(timer)}); + log.info("blocked until ended ({}) {} {}, in {}", new Object[]{i, t, t.getStatusDetail(false), Duration.of(timer)}); try { t.get(); - Assert.fail("Should have failed getting result of cancelled "+t); + Assert.fail("Should have failed getting result of cancelled " + t); } catch (Exception e) { /* expected */ } assertEquals(i.get(), 2); Assert.assertTrue(interruptedSemaphore.tryAcquire(1, SMALL_FRACTION_OF_CYCLE_DELAY.toMilliseconds(), TimeUnit.MILLISECONDS), "child thread was not interrupted"); - log.info("ended ({}) {} {}, in {}", new Object[] {i, t, t.getStatusDetail(false), Duration.of(timer)}); + log.info("ended ({}) {} {}, in {}", new Object[]{i, t, t.getStatusDetail(false), Duration.of(timer)}); Assert.assertTrue(Duration.of(timer).isShorterThan(SMALL_FRACTION_OF_CYCLE_DELAY)); + Asserts.eventually(m::getNumActiveTasks, l -> l==0); } - @Test(groups="Integration") + @Test(groups = "Integration") public void testScheduledTaskTakesLongerThanPeriod() throws Exception { final int PERIOD = 1; final int SLEEP_TIME = 100; final int EARLY_RETURN_GRACE = 10; BasicExecutionManager m = new BasicExecutionManager("mycontextid"); final List<Long> execTimes = new CopyOnWriteArrayList<Long>(); - + ScheduledTask t = new ScheduledTask(MutableMap.of("delay", PERIOD, "period", PERIOD), new Callable<Task<?>>() { @Override public Task<?> call() throws Exception { @@ -317,17 +343,20 @@ public class ScheduledExecutionTest { } catch (InterruptedException e) { throw Exceptions.propagate(e); } - }}); - }}); - + } + }); + } + }); + m.submit(t); - + Asserts.succeedsEventually(new Runnable() { @Override public void run() { - assertTrue(execTimes.size() > 3, "size="+execTimes.size()); - }}); - + assertTrue(execTimes.size() > 3, "size=" + execTimes.size()); + } + }); + List<Long> timeDiffs = Lists.newArrayList(); long prevExecTime = -1; for (Long execTime : execTimes) { @@ -338,9 +367,86 @@ public class ScheduledExecutionTest { prevExecTime = execTime; } } - + for (Long timeDiff : timeDiffs) { - if (timeDiff < (SLEEP_TIME - EARLY_RETURN_GRACE)) fail("timeDiffs="+timeDiffs+"; execTimes="+execTimes); + if (timeDiff < (SLEEP_TIME - EARLY_RETURN_GRACE)) + fail("timeDiffs=" + timeDiffs + "; execTimes=" + execTimes); } + + t.cancel(); + Asserts.eventually(m::getNumActiveTasks, l -> l==0); } -} + + @Test(groups="Integration") // because slow + public void testScheduledTaskInContextClearing() throws Exception { + Duration PERIOD = Duration.millis(50); + int COUNT = 10; + BasicExecutionManager m = new BasicExecutionManager("mycontextid"); + + final List<String> errors = MutableList.of(); + + final AtomicInteger i2 = new AtomicInteger(); + // now start other task, in entity context + ScheduledTask t2 = new ScheduledTask(MutableMap.of("displayName", "t2", "period", PERIOD), new Callable<Task<?>>() { + @Override + public Task<?> call() throws Exception { + return new BasicTask<Integer>(MutableMap.of("displayName", "t2-i"), new Callable<Integer>() { + @Override + public Integer call() { + Entity ce = BrooklynTaskTags.getContextEntity(Tasks.current()); + log.info("entity task t2 running (" + i2 + "): " + Thread.currentThread() + " " + Tasks.current() + " " + Tasks.current().getStatusDetail(false)+" - "+ce); + if (ce==null) { + errors.add("Scheduled task t2 missing context entity"); + } + + ScheduledTask submitter = (ScheduledTask) ((BasicTask) Tasks.current()).getSubmittedByTask(); + i2.incrementAndGet(); + if (i2.get() >= COUNT) submitter.cancel(); + return i2.get(); + } + }); + } + }); + Entity contextEntity = new TestEntityImpl(); + BasicExecutionContext exec = new BasicExecutionContext(m, MutableList.of(BrooklynTaskTags.tagForContextEntity(contextEntity))); + exec.submit(t2); + + final AtomicInteger i1 = new AtomicInteger(); + + ScheduledTask t1 = new ScheduledTask(MutableMap.of("displayName", "t1", "period", PERIOD), new Callable<Task<?>>() { + @Override + public Task<?> call() throws Exception { + return new BasicTask<Integer>(MutableMap.of("displayName", "t1-i"), new Callable<Integer>() { + @Override + public Integer call() { + Entity ce = BrooklynTaskTags.getContextEntity(Tasks.current()); + log.info("non-entity task t1 running (" + i1 + "): " + Thread.currentThread() + " " + Tasks.current() + " " + Tasks.current().getStatusDetail(false)+" - "+ce); + if (ce!=null) { + errors.add("Scheduled task t1 has context entity "+ce); + } + + ScheduledTask submitter = (ScheduledTask) ((BasicTask) Tasks.current()).getSubmittedByTask(); + i1.incrementAndGet(); + if (i1.get() >= COUNT) submitter.cancel(); + return i1.get(); + } + }); + } + }); + + Time.sleep(70); + log.info(JavaClassNames.niceClassAndMethod() + " - submitting {} {}", t1, t1.getStatusDetail(false)); + m.submit(t1); + log.info("submitted {} {}", t1, t1.getStatusDetail(false)); + Integer interimResult = (Integer) t1.get(); + log.info("done one ({}) {} {}", new Object[]{interimResult, t1, t1.getStatusDetail(false)}); + assertTrue(i1.get() > 0); + + t1.blockUntilEnded(); + t2.blockUntilEnded(); + + Asserts.assertSize(errors, 0); + Asserts.eventually(m::getNumActiveTasks, l -> l==0); + } + +} \ No newline at end of file
