This is an automated email from the ASF dual-hosted git repository. heneveld pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git
commit b760aead06eea6f2a0e395497a4de48a5fc722f8 Author: Alex Heneveld <[email protected]> AuthorDate: Tue Sep 14 17:03:28 2021 +0100 fix race where persister might still be reading from hot standby doesn't affect behaviour, just prevents errors. also improve scheduled task logging. --- .../rebind/mementos/BrooklynMementoPersister.java | 2 + .../core/mgmt/ha/HighAvailabilityManagerImpl.java | 5 + .../BrooklynMementoPersisterToObjectStore.java | 47 +- .../core/mgmt/rebind/RebindManagerImpl.java | 7 + .../util/core/task/BasicExecutionManager.java | 682 ++++++++++++--------- .../brooklyn/util/core/task/ScheduledTask.java | 6 +- .../exceptions/RuntimeInterruptedException.java | 5 + 7 files changed, 456 insertions(+), 298 deletions(-) diff --git a/api/src/main/java/org/apache/brooklyn/api/mgmt/rebind/mementos/BrooklynMementoPersister.java b/api/src/main/java/org/apache/brooklyn/api/mgmt/rebind/mementos/BrooklynMementoPersister.java index 2090c80..163a0ff 100644 --- a/api/src/main/java/org/apache/brooklyn/api/mgmt/rebind/mementos/BrooklynMementoPersister.java +++ b/api/src/main/java/org/apache/brooklyn/api/mgmt/rebind/mementos/BrooklynMementoPersister.java @@ -109,6 +109,8 @@ public interface BrooklynMementoPersister { void disableWriteAccess(boolean graceful); /** permanently shuts down all access to the remote store */ void stop(boolean graceful); + /** cancels/waits for all current tasks */ + void reset(); @VisibleForTesting void waitForWritesCompleted(Duration timeout) throws InterruptedException, TimeoutException; diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/ha/HighAvailabilityManagerImpl.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/ha/HighAvailabilityManagerImpl.java index 8c63775..9fa2ead 100644 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/ha/HighAvailabilityManagerImpl.java +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/ha/HighAvailabilityManagerImpl.java @@ -891,6 +891,11 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager { protected void promoteToMaster() { LOG.debug("Promoting to master: "+this); + if (Tasks.current()!=null) { + // let us check if promotion is happening in the right context + Task task = Tasks.current(); + LOG.debug("Task context for master promotion: "+task+" ("+task.getTags()+"); "+task.getStatusSummary()); + } if (!running) { LOG.warn("Ignoring promote-to-master request, as HighAvailabilityManager is not running"); return; diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/BrooklynMementoPersisterToObjectStore.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/BrooklynMementoPersisterToObjectStore.java index 5f16faf..e3b3540 100644 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/BrooklynMementoPersisterToObjectStore.java +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/BrooklynMementoPersisterToObjectStore.java @@ -43,6 +43,7 @@ import org.apache.brooklyn.api.mgmt.rebind.RebindExceptionHandler; import org.apache.brooklyn.api.mgmt.rebind.mementos.BrooklynMemento; import org.apache.brooklyn.api.mgmt.rebind.mementos.BrooklynMementoManifest; import org.apache.brooklyn.api.mgmt.rebind.mementos.BrooklynMementoPersister; +import org.apache.brooklyn.api.mgmt.rebind.mementos.BrooklynMementoPersister.LookupContext; import org.apache.brooklyn.api.mgmt.rebind.mementos.BrooklynMementoRawData; import org.apache.brooklyn.api.mgmt.rebind.mementos.CatalogItemMemento; import org.apache.brooklyn.api.mgmt.rebind.mementos.ManagedBundleMemento; @@ -71,6 +72,7 @@ import org.apache.brooklyn.util.collections.MutableSet; import org.apache.brooklyn.util.core.xstream.XmlUtil; import org.apache.brooklyn.util.exceptions.CompoundRuntimeException; import org.apache.brooklyn.util.exceptions.Exceptions; +import org.apache.brooklyn.util.exceptions.RuntimeInterruptedException; import org.apache.brooklyn.util.text.Strings; import org.apache.brooklyn.util.time.Duration; import org.apache.brooklyn.util.time.Time; @@ -117,7 +119,7 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer private final Map<String, StoreObjectAccessorWithLock> writers = new LinkedHashMap<String, PersistenceObjectStore.StoreObjectAccessorWithLock>(); - private final ListeningExecutorService executor; + private ListeningExecutorService executor; private volatile boolean writesAllowed = false; private volatile boolean writesShuttingDown = false; @@ -154,8 +156,6 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer .withClassLoader(classLoader).build(); this.serializerWithStandardClassLoader = new RetryingMementoSerializer<Object>(rawSerializer, maxSerializationAttempts); - int maxThreadPoolSize = brooklynProperties.getConfig(PERSISTER_MAX_THREAD_POOL_SIZE); - objectStore.createSubPath("entities"); objectStore.createSubPath("locations"); objectStore.createSubPath("policies"); @@ -166,11 +166,11 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer // FIXME does it belong here or to ManagementPlaneSyncRecordPersisterToObjectStore ? objectStore.createSubPath("plane"); - executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(maxThreadPoolSize, new ThreadFactory() { - @Override public Thread newThread(Runnable r) { - // Note: Thread name referenced in logback-includes' ThreadNameDiscriminator - return new Thread(r, "brooklyn-persister"); - }})); + resetExecutor(); + } + + private Integer maxThreadPoolSize() { + return brooklynProperties.getConfig(PERSISTER_MAX_THREAD_POOL_SIZE); } public MementoSerializer<Object> getMementoSerializer() { @@ -210,8 +210,14 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer final ManagementContext managementContext = lookupContext.lookupManagementContext(); RegisteredType catalogItem = managementContext.getTypeRegistry().get(catalogItemId); if (catalogItem == null) { - // TODO do we need to only log once, rather than risk log.warn too often? I think this only happens on rebind, so ok. - LOG.warn("Unable to load catalog item "+catalogItemId + // will happen on rebind if our execution should have ended + if (Thread.interrupted()) { + LOG.debug("Aborting (probably old) rebind iteration"); + throw new RuntimeInterruptedException("Rebind iteration cancelled"); + } + + // might come here for other reasons too + LOG.debug("Unable to load registered type "+catalogItemId +" for custom class loader of " + type + " " + objectId + "; will use default class loader"); return null; } else { @@ -245,7 +251,24 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer @Override public void stop(boolean graceful) { disableWriteAccess(graceful); - + stopExecutor(graceful); + } + + @Override + public void reset() { + resetExecutor(); + } + + public void resetExecutor() { + stopExecutor(false); + executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(maxThreadPoolSize(), new ThreadFactory() { + @Override public Thread newThread(Runnable r) { + // Note: Thread name referenced in logback-includes' ThreadNameDiscriminator + return new Thread(r, "brooklyn-persister"); + }})); + } + + public void stopExecutor(boolean graceful) { if (executor != null) { if (graceful) { executor.shutdown(); @@ -473,6 +496,8 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer @Override public BrooklynMemento loadMemento(BrooklynMementoRawData mementoData, final LookupContext lookupContext, final RebindExceptionHandler exceptionHandler) throws IOException { + LOG.debug("Loading mementos"); + if (mementoData==null) mementoData = loadMementoRawData(exceptionHandler); diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java index 899e10b..c45c96f 100644 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java @@ -396,6 +396,9 @@ public class RebindManagerImpl implements RebindManager { LOG.warn("Rebind (read-only) tasks took too long to die after interrupt (ignoring): "+readOnlyTask); } readOnlyTask = null; + if (persistenceStoreAccess!=null) { + persistenceStoreAccess.reset(); + } LOG.debug("Stopped read-only rebinding ("+this+"), mgmt "+managementContext.getManagementNodeId()); // short waits when promoting @@ -404,6 +407,7 @@ public class RebindManagerImpl implements RebindManager { Duration.seconds(5)); // note, items are subsequently unmanaged via: // HighAvailabilityManagerImpl.clearManagedItems + readOnlyRebindCount.set(0); } } @@ -467,6 +471,9 @@ public class RebindManagerImpl implements RebindManager { @Override public void reset() { if (persistenceRealChangeListener != null && !persistenceRealChangeListener.isActive()) persistenceRealChangeListener.reset(); + if (persistenceStoreAccess!=null) { + persistenceStoreAccess.reset(); + } } @Override diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java index 41a704e..88812e0 100644 --- a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java +++ b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java @@ -59,6 +59,7 @@ import org.apache.brooklyn.api.mgmt.TaskAdaptable; import org.apache.brooklyn.api.mgmt.entitlement.EntitlementContext; import org.apache.brooklyn.core.BrooklynFeatureEnablement; import org.apache.brooklyn.core.BrooklynLogging; +import org.apache.brooklyn.core.BrooklynLogging.LoggingLevel; import org.apache.brooklyn.core.config.Sanitizer; import org.apache.brooklyn.core.entity.Entities; import org.apache.brooklyn.core.mgmt.BrooklynTaskTags; @@ -133,16 +134,18 @@ public class BasicExecutionManager implements ExecutionManager { public static BrooklynTaskLoggingMdc create() { return new BrooklynTaskLoggingMdc(); } + public static BrooklynTaskLoggingMdc create(Task task) { return new BrooklynTaskLoggingMdc().withTask(task); } boolean isRedundant = false; Task task; - MDC.MDCCloseable taskMdc=null, entityMdc=null; + MDC.MDCCloseable taskMdc = null, entityMdc = null; String prevTaskMdc, prevEntityMdc; - private BrooklynTaskLoggingMdc() {} + private BrooklynTaskLoggingMdc() { + } public BrooklynTaskLoggingMdc withTask(Task task) { this.task = task; @@ -156,7 +159,7 @@ public class BasicExecutionManager implements ExecutionManager { // the log _message_ shows who submitted it, which is more reliable than // the prevTaskMdc as we might be in a different thread and/or the prevTaskMdc // can misleadingly point point to the task which triggered the executor - if (task!=null) { + if (task != null) { prevTaskMdc = MDC.get(LOGGING_MDC_KEY_TASK_ID); if (Objects.equals(task.getId(), prevTaskMdc)) { isRedundant = true; @@ -172,16 +175,20 @@ public class BasicExecutionManager implements ExecutionManager { entityMdc = MDC.putCloseable(LOGGING_MDC_KEY_ENTITY_IDS, ""); } - logEvent("Starting task", task, entity); + logStartEvent("Starting task", task, entity); return this; } - public static void logEvent(String prefix, Task task, Entity entity) { + public static void logStartEvent(String prefix, Task task, Entity entity) { if (BrooklynLoggingCategories.TASK_LIFECYCLE_LOG.isDebugEnabled() || BrooklynLoggingCategories.TASK_LIFECYCLE_LOG.isTraceEnabled()) { + if (entity==null) { + entity = BrooklynTaskTags.getTargetOrContextEntity(Tasks.current()); + } + String taskName = task.getDisplayName(); String message = prefix + " " + task.getId() + - (Strings.isNonBlank(taskName) ? " ("+taskName+")" : "") + + (Strings.isNonBlank(taskName) ? " (" + taskName + ")" : "") + (entity == null ? "" : " on entity " + entity.getId()) + (Strings.isNonBlank(task.getSubmittedByTaskId()) ? " from task " + task.getSubmittedByTaskId() : "") + Entitlements.getEntitlementContextUserMaybe(). @@ -197,24 +204,29 @@ public class BasicExecutionManager implements ExecutionManager { } } - public void finish() { - if (isRedundant) { - return; - } - if (BrooklynLoggingCategories.TASK_LIFECYCLE_LOG.isDebugEnabled() || BrooklynLoggingCategories.TASK_LIFECYCLE_LOG.isTraceEnabled()){ + public static void logEndEvent(String prefix, Task task) { + if (BrooklynLoggingCategories.TASK_LIFECYCLE_LOG.isDebugEnabled() || BrooklynLoggingCategories.TASK_LIFECYCLE_LOG.isTraceEnabled()) { String taskName = task.getDisplayName(); BrooklynLogging.log(BrooklynLoggingCategories.TASK_LIFECYCLE_LOG, UNINTERESTING_TASK_NAMES.contains(taskName) ? BrooklynLogging.LoggingLevel.TRACE : BrooklynLogging.LoggingLevel.DEBUG, - "Ending task {}", task.getId()); + prefix + " " + task.getId()); + } + } + + public void finish() { + if (isRedundant) { + return; } + // not normally used for scheduled tasks + logEndEvent(task instanceof ScheduledTask ? "Ending scheduled task context" : "Ending task", task); if (entityMdc != null) { entityMdc.close(); - if (prevEntityMdc!=null) MDC.put(LOGGING_MDC_KEY_ENTITY_IDS, prevEntityMdc); + if (prevEntityMdc != null) MDC.put(LOGGING_MDC_KEY_ENTITY_IDS, prevEntityMdc); prevEntityMdc = null; } if (taskMdc != null) { taskMdc.close(); - if (prevTaskMdc!=null) MDC.put(LOGGING_MDC_KEY_TASK_ID, prevTaskMdc); + if (prevTaskMdc != null) MDC.put(LOGGING_MDC_KEY_TASK_ID, prevTaskMdc); prevTaskMdc = null; } } @@ -224,131 +236,141 @@ public class BasicExecutionManager implements ExecutionManager { } } - public static void registerUninterestingTaskName(String taskName){ - registerUninterestingTaskName(taskName,false); + public static void registerUninterestingTaskName(String taskName) { + registerUninterestingTaskName(taskName, false); } - public static void registerUninterestingTaskName(String taskName, boolean registerScheduledPrefix){ - log.debug("Registering '{}' as UninterestingTaskName. Starting finishing trace will be log as trace",taskName); + + public static void registerUninterestingTaskName(String taskName, boolean registerScheduledPrefix) { + log.debug("Registering '{}' as UninterestingTaskName. Starting finishing trace will be log as trace", taskName); UNINTERESTING_TASK_NAMES.add(taskName); - if(registerScheduledPrefix) { + if (registerScheduledPrefix) { UNINTERESTING_TASK_NAMES.add(ScheduledTask.prefixScheduledName(taskName)); } } private final ThreadFactory threadFactory; - + private final ThreadFactory daemonThreadFactory; - + private final ExecutorService runner; - + private final ScheduledExecutorService delayedRunner; // inefficient having so many records, and also doing searches through ... // many things in here could be more efficient however (different types of lookup etc), // do that when we need to. - + //access to this field AND to members in this field is synchronized, //to allow us to preserve order while guaranteeing thread-safe //(but more testing is needed before we are completely sure it is thread-safe!) //synch blocks are as finely grained as possible for efficiency; //NB CopyOnWriteArraySet is a perf bottleneck, and the simple map makes it easier to remove when a tag is empty - private Map<Object,Set<Task<?>>> tasksByTag = new HashMap<Object,Set<Task<?>>>(); - - private ConcurrentMap<String,Task<?>> tasksById = new ConcurrentHashMap<String,Task<?>>(); + private Map<Object, Set<Task<?>>> tasksByTag = new HashMap<Object, Set<Task<?>>>(); + + private ConcurrentMap<String, Task<?>> tasksById = new ConcurrentHashMap<String, Task<?>>(); private ConcurrentMap<Object, TaskScheduler> schedulerByTag = new ConcurrentHashMap<Object, TaskScheduler>(); - /** count of all tasks submitted, including finished */ + /** + * count of all tasks submitted, including finished + */ private final AtomicLong totalTaskCount = new AtomicLong(); - - /** tasks submitted but not yet done (or in cases of interruption/cancelled not yet GC'd) */ + + /** + * tasks submitted but not yet done (or in cases of interruption/cancelled not yet GC'd) + */ private Set<String> incompleteTaskIds = Sets.newConcurrentHashSet(); - - /** tasks started but not yet finished */ + + /** + * tasks started but not yet finished + */ private final AtomicInteger activeTaskCount = new AtomicInteger(); - + private final List<ExecutionListener> listeners = new CopyOnWriteArrayList<ExecutionListener>(); - + private final static ThreadLocal<String> threadOriginalName = new ThreadLocal<String>() { @Override protected String initialValue() { // should not happen, as only access is in _afterEnd with a check that _beforeStart was invoked - log.warn("No original name recorded for thread "+Thread.currentThread().getName()+"; task "+Tasks.current()); - return "brooklyn-thread-pool-"+Identifiers.makeRandomId(8); + log.warn("No original name recorded for thread " + Thread.currentThread().getName() + "; task " + Tasks.current()); + return "brooklyn-thread-pool-" + Identifiers.makeRandomId(8); } }; - + public BasicExecutionManager(String contextid) { threadFactory = newThreadFactory(contextid); daemonThreadFactory = new ThreadFactoryBuilder() .setThreadFactory(threadFactory) .setDaemon(true) .build(); - + // use Executors.newCachedThreadPool(daemonThreadFactory), but timeout of 1s rather than 60s for better shutdown! - runner = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), + runner = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), daemonThreadFactory); - + delayedRunner = new ScheduledThreadPoolExecutor(1, daemonThreadFactory); if (jitterThreads) { log.info("Task startup jittering enabled with a maximum of " + jitterThreadsMaxDelay + " delay."); } } - + private final static class UncaughtExceptionHandlerImplementation implements Thread.UncaughtExceptionHandler { @Override public void uncaughtException(Thread t, Throwable e) { - log.error("Uncaught exception in thread "+t.getName(), e); + log.error("Uncaught exception in thread " + t.getName(), e); } } - - /** + + /** * For use by overriders to use custom thread factory. * But be extremely careful: called by constructor, so before sub-class' constructor will * have been invoked! */ protected ThreadFactory newThreadFactory(String contextid) { return new ThreadFactoryBuilder() - .setNameFormat("brooklyn-execmanager-"+contextid+"-%d") + .setNameFormat("brooklyn-execmanager-" + contextid + "-%d") .setUncaughtExceptionHandler(new UncaughtExceptionHandlerImplementation()) .build(); } - + public void shutdownNow() { shutdownNow(null); } - - /** shuts down the executor, and if a duration is supplied awaits termination for that long. + + /** + * shuts down the executor, and if a duration is supplied awaits termination for that long. + * * @return whether everything is terminated */ @Beta public boolean shutdownNow(Duration howLongToWaitForTermination) { runner.shutdownNow(); delayedRunner.shutdownNow(); - if (howLongToWaitForTermination!=null) { + if (howLongToWaitForTermination != null) { CountdownTimer timer = howLongToWaitForTermination.countdownTimer(); try { runner.awaitTermination(timer.getDurationRemaining().toMilliseconds(), TimeUnit.MILLISECONDS); - if (timer.isLive()) delayedRunner.awaitTermination(timer.getDurationRemaining().toMilliseconds(), TimeUnit.MILLISECONDS); + if (timer.isLive()) + delayedRunner.awaitTermination(timer.getDurationRemaining().toMilliseconds(), TimeUnit.MILLISECONDS); } catch (InterruptedException e) { throw Exceptions.propagate(e); } } return runner.isTerminated() && delayedRunner.isTerminated(); } - + public void addListener(ExecutionListener listener) { listeners.add(listener); } - + public void removeListener(ExecutionListener listener) { listeners.remove(listener); } - + /** * Deletes the given tag, including all tasks using this tag. - * + * <p> * Useful, for example, if an entity is being expunged so that we don't keep holding * a reference to it as a tag. */ @@ -367,9 +389,9 @@ public class BasicExecutionManager implements ExecutionManager { public void deleteTask(Task<?> task) { boolean removed = deleteTaskNonRecursive(task); if (!removed) return; - + if (task instanceof HasTaskChildren) { - List<Task<?>> children = ImmutableList.copyOf(((HasTaskChildren)task).getChildren()); + List<Task<?>> children = ImmutableList.copyOf(((HasTaskChildren) task).getChildren()); for (Task<?> child : children) { deleteTask(child); } @@ -391,10 +413,10 @@ public class BasicExecutionManager implements ExecutionManager { } Task<?> removed = tasksById.remove(task.getId()); incompleteTaskIds.remove(task.getId()); - if (removed!=null && removed.isSubmitted() && !removed.isDone(true)) { + if (removed != null && removed.isSubmitted() && !removed.isDone(true)) { Entity context = BrooklynTaskTags.getContextEntity(removed); - if (context!=null && !Entities.isManaged(context)) { - log.debug("Deleting active task on unmanagement of "+context+": "+removed); + if (context != null && !Entities.isManaged(context)) { + log.debug("Deleting active task on unmanagement of " + context + ": " + removed); } else { if (removed.isDone()) { log.debug("Deleting cancelled task before completion: " + removed + "; this task will continue to run in the background outwith " + this); @@ -419,23 +441,31 @@ public class BasicExecutionManager implements ExecutionManager { public boolean isShutdown() { return runner.isShutdown(); } - - /** count of all tasks submitted */ + + /** + * count of all tasks submitted + */ public long getTotalTasksSubmitted() { return totalTaskCount.get(); } - - /** count of tasks submitted but not ended */ + + /** + * count of tasks submitted but not ended + */ public long getNumIncompleteTasks() { return incompleteTaskIds.size(); } - - /** count of tasks started but not ended */ + + /** + * count of tasks started but not ended + */ public long getNumActiveTasks() { return activeTaskCount.get(); } - /** count of tasks kept in memory, often including ended tasks */ + /** + * count of tasks kept in memory, often including ended tasks + */ public long getNumInMemoryTasks() { return tasksById.size(); } @@ -444,7 +474,7 @@ public class BasicExecutionManager implements ExecutionManager { Preconditions.checkNotNull(tag); synchronized (tasksByTag) { Set<Task<?>> result = tasksWithTagLiveOrNull(tag); - if (result==null) { + if (result == null) { result = Collections.synchronizedSet(new LinkedHashSet<Task<?>>()); tasksByTag.put(tag, result); } @@ -452,7 +482,9 @@ public class BasicExecutionManager implements ExecutionManager { } } - /** exposes live view, for internal use only */ + /** + * exposes live view, for internal use only + */ @Beta public Set<Task<?>> tasksWithTagLiveOrNull(Object tag) { synchronized (tasksByTag) { @@ -464,8 +496,10 @@ public class BasicExecutionManager implements ExecutionManager { public Task<?> getTask(String id) { return tasksById.get(id); } - - /** not on interface because potentially expensive */ + + /** + * not on interface because potentially expensive + */ public List<Task<?>> getAllTasks() { // not sure if synching makes any difference; have not observed CME's yet // (and so far this is only called when a CME was caught on a previous operation) @@ -473,23 +507,23 @@ public class BasicExecutionManager implements ExecutionManager { return MutableList.copyOf(tasksById.values()); } } - + @Override public Set<Task<?>> getTasksWithTag(Object tag) { Set<Task<?>> result = tasksWithTagLiveOrNull(tag); - if (result==null) return Collections.emptySet(); + if (result == null) return Collections.emptySet(); synchronized (result) { return Collections.unmodifiableSet(new LinkedHashSet<Task<?>>(result)); } } - + @Override public Set<Task<?>> getTasksWithAnyTag(Iterable<?> tags) { Set<Task<?>> result = new LinkedHashSet<Task<?>>(); Iterator<?> ti = tags.iterator(); while (ti.hasNext()) { Set<Task<?>> tasksForTag = tasksWithTagLiveOrNull(ti.next()); - if (tasksForTag!=null) { + if (tasksForTag != null) { synchronized (tasksForTag) { result.addAll(tasksForTag); } @@ -498,7 +532,9 @@ public class BasicExecutionManager implements ExecutionManager { return Collections.unmodifiableSet(result); } - /** only works with at least one tag; returns empty if no tags */ + /** + * only works with at least one tag; returns empty if no tags + */ @Override public Set<Task<?>> getTasksWithAllTags(Iterable<?> tags) { //NB: for this method retrieval for multiple tags could be made (much) more efficient (if/when it is used with multiple tags!) @@ -509,7 +545,7 @@ public class BasicExecutionManager implements ExecutionManager { Iterator<?> ti = tags.iterator(); while (ti.hasNext()) { Object tag = ti.next(); - if (first) { + if (first) { first = false; result.addAll(getTasksWithTag(tag)); } else { @@ -519,49 +555,83 @@ public class BasicExecutionManager implements ExecutionManager { return Collections.unmodifiableSet(result); } - /** live view of all tasks, for internal use only */ + /** + * live view of all tasks, for internal use only + */ @Beta - public Collection<Task<?>> allTasksLive() { return tasksById.values(); } - + public Collection<Task<?>> allTasksLive() { + return tasksById.values(); + } + @Override - public Set<Object> getTaskTags() { + public Set<Object> getTaskTags() { synchronized (tasksByTag) { - return Collections.unmodifiableSet(Sets.newLinkedHashSet(tasksByTag.keySet())); + return Collections.unmodifiableSet(Sets.newLinkedHashSet(tasksByTag.keySet())); } } - @Override @Deprecated public Task<?> submit(Runnable r) { return submit(new LinkedHashMap<Object,Object>(1), r); } - @Override public Task<?> submit(String displayName, Runnable r) { return submit(MutableMap.of("displayName", displayName), r); } - @Override public Task<?> submit(Map<?,?> flags, Runnable r) { return submit(flags, new BasicTask<Void>(flags, r)); } + @Override + @Deprecated + public Task<?> submit(Runnable r) { + return submit(new LinkedHashMap<Object, Object>(1), r); + } + + @Override + public Task<?> submit(String displayName, Runnable r) { + return submit(MutableMap.of("displayName", displayName), r); + } + + @Override + public Task<?> submit(Map<?, ?> flags, Runnable r) { + return submit(flags, new BasicTask<Void>(flags, r)); + } + + @Override + @Deprecated + public <T> Task<T> submit(Callable<T> c) { + return submit(new LinkedHashMap<Object, Object>(1), c); + } + + @Override + public <T> Task<T> submit(String displayName, Callable<T> c) { + return submit(MutableMap.of("displayName", displayName), c); + } - @Override @Deprecated public <T> Task<T> submit(Callable<T> c) { return submit(new LinkedHashMap<Object,Object>(1), c); } - @Override public <T> Task<T> submit(String displayName, Callable<T> c) { return submit(MutableMap.of("displayName", displayName), c); } - @Override public <T> Task<T> submit(Map<?,?> flags, Callable<T> c) { return submit(flags, new BasicTask<T>(flags, c)); } + @Override + public <T> Task<T> submit(Map<?, ?> flags, Callable<T> c) { + return submit(flags, new BasicTask<T>(flags, c)); + } - @Override public <T> Task<T> submit(TaskAdaptable<T> t) { return submit(new LinkedHashMap<Object,Object>(1), t); } + @Override + public <T> Task<T> submit(TaskAdaptable<T> t) { + return submit(new LinkedHashMap<Object, Object>(1), t); + } @Override - public <T> Task<T> submit(Map<?,?> flags, TaskAdaptable<T> task) { + public <T> Task<T> submit(Map<?, ?> flags, TaskAdaptable<T> task) { if (!(task instanceof Task)) task = task.asTask(); synchronized (task) { - if (((TaskInternal<?>)task).getInternalFuture()!=null) return (Task<T>)task; + if (((TaskInternal<?>) task).getInternalFuture() != null) return (Task<T>) task; return submitNewTask(flags, (Task<T>) task); } } - public <T> Task<T> scheduleWith(Task<T> task) { return scheduleWith(Collections.emptyMap(), task); } - public <T> Task<T> scheduleWith(Map<?,?> flags, Task<T> task) { + public <T> Task<T> scheduleWith(Task<T> task) { + return scheduleWith(Collections.emptyMap(), task); + } + + public <T> Task<T> scheduleWith(Map<?, ?> flags, Task<T> task) { synchronized (task) { - if (((TaskInternal<?>)task).getInternalFuture()!=null) return task; + if (((TaskInternal<?>) task).getInternalFuture() != null) return task; return submitNewTask(flags, task); } } - protected Task<?> submitNewScheduledTask(final Map<?,?> flags, final ScheduledTask task) { + protected Task<?> submitNewScheduledTask(final Map<?, ?> flags, final ScheduledTask task) { boolean result = false; try { - BrooklynTaskLoggingMdc.logEvent("Submitting scheduled task", task, BrooklynTaskTags.getTargetOrContextEntity(Tasks.current())); + BrooklynTaskLoggingMdc.logStartEvent("Submitting scheduled task", task, null); result = submitSubsequentScheduledTask(flags, task); } finally { if (!result) { @@ -570,11 +640,11 @@ public class BasicExecutionManager implements ExecutionManager { } return task; } - - private boolean submitSubsequentScheduledTask(final Map<?,?> flags, final ScheduledTask task) { + + private boolean submitSubsequentScheduledTask(final Map<?, ?> flags, final ScheduledTask task) { if (!task.isDone()) { task.internalFuture = delayedRunner.schedule(new ScheduledTaskCallable(task, flags), - task.delay.toNanoseconds(), TimeUnit.NANOSECONDS); + task.delay.toNanoseconds(), TimeUnit.NANOSECONDS); return true; } else { return false; @@ -583,7 +653,7 @@ public class BasicExecutionManager implements ExecutionManager { protected class ScheduledTaskCallable implements Callable<Object> { public ScheduledTask task; - public Map<?,?> flags; + public Map<?, ?> flags; public ScheduledTaskCallable(ScheduledTask task, Map<?, ?> flags) { this.task = task; @@ -591,12 +661,12 @@ public class BasicExecutionManager implements ExecutionManager { } @Override - @SuppressWarnings({ "rawtypes", "unchecked" }) + @SuppressWarnings({"rawtypes", "unchecked"}) public Object call() { TaskInternal<?> taskIteration = null; Throwable error = null; try { - if (task.startTimeUtc==-1) { + if (task.startTimeUtc == -1) { beforeSubmitScheduledTaskAllIterations(flags, task); beforeStartScheduledTaskAllIterations(flags, task); @@ -610,57 +680,60 @@ public class BasicExecutionManager implements ExecutionManager { final Callable<?> oldJob = taskIteration.getJob(); final TaskInternal<?> taskIterationF = taskIteration; - taskIteration.setJob(new Callable() { @Override public Object call() { - if (task.isCancelled()) { - CancellationException cancelDetected = new CancellationException("cancel detected"); - try { - afterEndScheduledTaskSubmissionIteration(flags, task, taskIterationF, cancelDetected); - } finally { - // do in finally block so runs even if above throws cancelDetected - afterEndScheduledTaskAllIterations(flags, task, cancelDetected); - } - throw cancelDetected; - } - Throwable lastError = null; - boolean shouldResubmit = true; - task.recentRun = taskIterationF; - try (BrooklynTaskLoggingMdc mdc = BrooklynTaskLoggingMdc.create(taskIterationF).start()) { - beforeStartScheduledTaskSubmissionIteration(flags, task, taskIterationF); - synchronized (task) { - task.notifyAll(); - } - Object result; - try { - result = oldJob.call(); - task.lastThrownType = null; - } catch (Exception e) { - lastError = e; - shouldResubmit = shouldResubmitOnException(oldJob, e); - throw Exceptions.propagate(e); + taskIteration.setJob(new Callable() { + @Override + public Object call() { + if (task.isCancelled()) { + CancellationException cancelDetected = new CancellationException("cancel detected in scheduled job for " + task); + try { + afterEndScheduledTaskSubmissionIteration(flags, task, taskIterationF, cancelDetected); + } finally { + // do in finally block so runs even if above throws cancelDetected + afterEndScheduledTaskAllIterations(flags, task, cancelDetected); + } + throw cancelDetected; } - return result; - } finally { - if (!task.isCancelled() || task.getEndTimeUtc()<=0) { - // don't re-run on cancellation - - afterEndScheduledTaskSubmissionIteration(flags, task, taskIterationF, lastError); - // do in finally block in case we were interrupted - if (shouldResubmit && resubmit()) { - // resubmitted fine, no-op - } else { - // not resubmitted, note ending - afterEndScheduledTaskAllIterations(flags, task, lastError); + Throwable lastError = null; + boolean shouldResubmit = true; + task.recentRun = taskIterationF; + try (BrooklynTaskLoggingMdc mdc = BrooklynTaskLoggingMdc.create(taskIterationF).start()) { + beforeStartScheduledTaskSubmissionIteration(flags, task, taskIterationF); + synchronized (task) { + task.notifyAll(); + } + Object result; + try { + result = oldJob.call(); + task.lastThrownType = null; + } catch (Exception e) { + lastError = e; + shouldResubmit = shouldResubmitOnException(oldJob, e); + throw Exceptions.propagate(e); + } + return result; + } finally { + if (!task.isCancelled() || task.getEndTimeUtc() <= 0) { + // don't re-run on cancellation + + afterEndScheduledTaskSubmissionIteration(flags, task, taskIterationF, lastError); + // do in finally block in case we were interrupted + if (shouldResubmit && resubmit()) { + // resubmitted fine, no-op + } else { + // not resubmitted, note ending + afterEndScheduledTaskAllIterations(flags, task, lastError); + } } } } - }}); + }); task.nextRun = taskIteration; ExecutionContext ec = // no longer associated the execution context on each execution; // BasicExecutionContext.getCurrentExecutionContext(); // instead it is set on the task task.executionContext; - if (ec!=null) return ec.submit(taskIteration); + if (ec != null) return ec.submit(taskIteration); else return submit(taskIteration); } catch (Exception e) { @@ -672,7 +745,7 @@ public class BasicExecutionManager implements ExecutionManager { private boolean resubmit() { task.runCount++; - if (task.period!=null && !task.isCancelled()) { + if (task.period != null && !task.isCancelled()) { task.delay = task.period; return submitSubsequentScheduledTask(flags, task); } else { @@ -704,7 +777,7 @@ public class BasicExecutionManager implements ExecutionManager { @Override public String toString() { - return "ScheduledTaskCallable["+task+","+flags+"]"; + return "ScheduledTaskCallable[" + task + "," + flags + "]"; } } @@ -727,8 +800,8 @@ public class BasicExecutionManager implements ExecutionManager { afterEndForCancelBeforeStart(flags, task, false); throw new CancellationException(); } - result = ((TaskInternal<T>)task).getJob().call(); - } catch(Throwable e) { + result = ((TaskInternal<T>) task).getJob().call(); + } catch (Throwable e) { error = e; } finally { afterEndAtomicTask(flags, task, error); @@ -738,7 +811,7 @@ public class BasicExecutionManager implements ExecutionManager { @Override public String toString() { - return "BEM.call("+task+","+flags+")"; + return "BEM.call(" + task + "," + flags + ")"; } } @@ -746,7 +819,7 @@ public class BasicExecutionManager implements ExecutionManager { private final Task<T> task; private BasicExecutionManager execMgmt; private final ExecutionList listeners; - + private CancellingListenableForwardingFutureForTask(BasicExecutionManager execMgmt, Future<T> delegate, ExecutionList list, Task<T> task) { super(delegate); this.listeners = list; @@ -767,48 +840,48 @@ public class BasicExecutionManager implements ExecutionManager { public ExecutionList getListeners() { return listeners; } - + @Override public boolean cancel(TaskCancellationMode mode) { boolean result = false; if (log.isTraceEnabled()) { - log.trace("CLFFT cancelling "+task+" mode "+mode); + log.trace("CLFFT cancelling " + task + " mode " + mode); } - if (!task.isCancelled()) result |= ((TaskInternal<T>)task).cancel(mode); + if (!task.isCancelled()) result |= ((TaskInternal<T>) task).cancel(mode); result |= delegate().cancel(mode.isAllowedToInterruptTask()); - + if (mode.isAllowedToInterruptDependentSubmittedTasks()) { - int subtasksFound=0; - int subtasksReallyCancelled=0; - + int subtasksFound = 0; + int subtasksReallyCancelled = 0; + if (task instanceof HasTaskChildren) { // cancel tasks in reverse order -- // it should be the case that if child1 is cancelled, // a parentTask should NOT call a subsequent child2, // but just in case, we cancel child2 first // NB: DST and others may apply their own recursive cancel behaviour - MutableList<Task<?>> childrenReversed = MutableList.copyOf( ((HasTaskChildren)task).getChildren() ); + MutableList<Task<?>> childrenReversed = MutableList.copyOf(((HasTaskChildren) task).getChildren()); Collections.reverse(childrenReversed); - - for (Task<?> child: childrenReversed) { + + for (Task<?> child : childrenReversed) { if (log.isTraceEnabled()) { - log.trace("Cancelling "+child+" on recursive cancellation of "+task); + log.trace("Cancelling " + child + " on recursive cancellation of " + task); } subtasksFound++; - if (((TaskInternal<?>)child).cancel(mode)) { + if (((TaskInternal<?>) child).cancel(mode)) { result = true; subtasksReallyCancelled++; } } } - for (Task<?> t: execMgmt.getAllTasks()) { + for (Task<?> t : execMgmt.getAllTasks()) { if (task.equals(t.getSubmittedByTask())) { if (mode.isAllowedToInterruptAllSubmittedTasks() || BrooklynTaskTags.isTransient(t)) { if (log.isTraceEnabled()) { - log.trace("Cancelling "+t+" on recursive cancellation of "+task); + log.trace("Cancelling " + t + " on recursive cancellation of " + task); } subtasksFound++; - if (((TaskInternal<?>)t).cancel(mode)) { + if (((TaskInternal<?>) t).cancel(mode)) { result = true; subtasksReallyCancelled++; } @@ -816,10 +889,10 @@ public class BasicExecutionManager implements ExecutionManager { } } if (log.isTraceEnabled()) { - log.trace("On cancel of "+task+", applicable subtask count "+subtasksFound+", of which "+subtasksReallyCancelled+" were actively cancelled"); + log.trace("On cancel of " + task + ", applicable subtask count " + subtasksFound + ", of which " + subtasksReallyCancelled + " were actively cancelled"); } } - + execMgmt.afterEndForCancelBeforeStart(null, task, true); return result; } @@ -841,7 +914,7 @@ public class BasicExecutionManager implements ExecutionManager { try { listener.onTaskDone(task); } catch (Exception e) { - log.warn("Error running execution listener "+listener+" of task "+task+" done", e); + log.warn("Error running execution listener " + listener + " of task " + task + " done", e); } } // run any listeners the task owner has added to its future @@ -850,46 +923,47 @@ public class BasicExecutionManager implements ExecutionManager { } @SuppressWarnings("unchecked") - protected <T> Task<T> submitNewTask(final Map<?,?> flags, final Task<T> task) { + protected <T> Task<T> submitNewTask(final Map<?, ?> flags, final Task<T> task) { if (log.isTraceEnabled()) { - log.trace("Submitting task {} ({}), with flags {}, and tags {}, job {}; caller {}", - new Object[] {task.getId(), task, Sanitizer.sanitize(flags), BrooklynTaskTags.getTagsFast(task), - (task instanceof TaskInternal ? ((TaskInternal<T>)task).getJob() : "<unavailable>"), - Tasks.current() }); - if (Tasks.current()==null && BrooklynTaskTags.isTransient(task)) { - log.trace("Stack trace for unparented submission of transient "+task, new Throwable("trace only (not an error)")); + log.trace("Submitting task {} ({}), with flags {}, and tags {}, job {}; caller {}", + new Object[]{task.getId(), task, Sanitizer.sanitize(flags), BrooklynTaskTags.getTagsFast(task), + (task instanceof TaskInternal ? ((TaskInternal<T>) task).getJob() : "<unavailable>"), + Tasks.current()}); + if (Tasks.current() == null && BrooklynTaskTags.isTransient(task)) { + log.trace("Stack trace for unparented submission of transient " + task, new Throwable("trace only (not an error)")); } } - + if (task instanceof ScheduledTask) { return (Task<T>) submitNewScheduledTask(flags, (ScheduledTask) task); } - + beforeSubmitAtomicTask(flags, task); - - if (((TaskInternal<T>)task).getJob() == null) - throw new NullPointerException("Task "+task+" submitted with with null job: job must be supplied."); - + + if (((TaskInternal<T>) task).getJob() == null) + throw new NullPointerException("Task " + task + " submitted with with null job: job must be supplied."); + Callable<T> job = new SubmissionCallable<T>(flags, task); - + // If there's a scheduler then use that; otherwise execute it directly Set<TaskScheduler> schedulers = null; - for (Object tago: BrooklynTaskTags.getTagsFast(task)) { + for (Object tago : BrooklynTaskTags.getTagsFast(task)) { TaskScheduler scheduler = getTaskSchedulerForTag(tago); - if (scheduler!=null) { - if (schedulers==null) schedulers = new LinkedHashSet<TaskScheduler>(2); + if (scheduler != null) { + if (schedulers == null) schedulers = new LinkedHashSet<TaskScheduler>(2); schedulers.add(scheduler); } } Future<T> future; - if (schedulers!=null && !schedulers.isEmpty()) { - if (schedulers.size()>1) log.warn("multiple schedulers detected, using only the first, for "+task+": "+schedulers); + if (schedulers != null && !schedulers.isEmpty()) { + if (schedulers.size() > 1) + log.warn("multiple schedulers detected, using only the first, for " + task + ": " + schedulers); future = schedulers.iterator().next().submit(job); } else { future = runner.submit(job); } afterSubmitRecordFuture(task, future); - + return task; } @@ -898,117 +972,136 @@ public class BasicExecutionManager implements ExecutionManager { // this future allows a caller to add custom listeners // (it does not notify the listeners; that's our job); // except on cancel we want to listen - CancellingListenableForwardingFutureForTask<T> listenableFuture = new CancellingListenableForwardingFutureForTask<T>(this, future, ((TaskInternal<T>)task).getListeners(), task); + CancellingListenableForwardingFutureForTask<T> listenableFuture = new CancellingListenableForwardingFutureForTask<T>(this, future, ((TaskInternal<T>) task).getListeners(), task); // and we want to make sure *our* (manager) listeners are given suitable callback - ((TaskInternal<T>)task).addListener(new SubmissionListenerToCallManagerListeners<T>(task, listenableFuture), runner); + ((TaskInternal<T>) task).addListener(new SubmissionListenerToCallManagerListeners<T>(task, listenableFuture), runner); // NB: can the above mean multiple callbacks to TaskInternal#runListeners? - + // finally expose the future to callers - ((TaskInternal<T>)task).initInternalFuture(listenableFuture); + ((TaskInternal<T>) task).initInternalFuture(listenableFuture); } - - protected void beforeSubmitScheduledTaskAllIterations(Map<?,?> flags, Task<?> task) { + + protected void beforeSubmitScheduledTaskAllIterations(Map<?, ?> flags, Task<?> task) { // for these, beforeSubmitAtomicTask is not called, // but beforeStartAtomic and afterSubmitAtomic _are_ called internalBeforeSubmit(flags, task); } - protected void beforeSubmitScheduledTaskSubmissionIteration(Map<?,?> flags, Task<?> task) { + + protected void beforeSubmitScheduledTaskSubmissionIteration(Map<?, ?> flags, Task<?> task) { internalBeforeSubmit(flags, task); } - protected void beforeSubmitAtomicTask(Map<?,?> flags, Task<?> task) { + + protected void beforeSubmitAtomicTask(Map<?, ?> flags, Task<?> task) { internalBeforeSubmit(flags, task); } - protected void beforeSubmitInSameThreadTask(Map<?,?> flags, Task<?> task) { + + protected void beforeSubmitInSameThreadTask(Map<?, ?> flags, Task<?> task) { internalBeforeSubmit(flags, task); } - /** invoked when a task is submitted */ - protected void internalBeforeSubmit(Map<?,?> flags, Task<?> task) { + + /** + * invoked when a task is submitted + */ + protected void internalBeforeSubmit(Map<?, ?> flags, Task<?> task) { incompleteTaskIds.add(task.getId()); - - if (task.getSubmittedByTaskId()==null) { + + if (task.getSubmittedByTaskId() == null) { Task<?> currentTask = Tasks.current(); - if (currentTask!=null) ((TaskInternal<?>)task).setSubmittedByTask( + if (currentTask != null) ((TaskInternal<?>) task).setSubmittedByTask( // do this instead of soft reference (2017-09) as soft refs impact GC Maybe.of(new TaskLookup(this, currentTask)), currentTask.getId()); } - ((TaskInternal<?>)task).setSubmitTimeUtc(System.currentTimeMillis()); - - if (flags!=null && flags.get("tag")!=null) ((TaskInternal<?>)task).getMutableTags().add(flags.remove("tag")); - if (flags!=null && flags.get("tags")!=null) ((TaskInternal<?>)task).getMutableTags().addAll((Collection<?>)flags.remove("tags")); + ((TaskInternal<?>) task).setSubmitTimeUtc(System.currentTimeMillis()); - for (Object tag: BrooklynTaskTags.getTagsFast(task)) { + if (flags != null && flags.get("tag") != null) + ((TaskInternal<?>) task).getMutableTags().add(flags.remove("tag")); + if (flags != null && flags.get("tags") != null) + ((TaskInternal<?>) task).getMutableTags().addAll((Collection<?>) flags.remove("tags")); + + for (Object tag : BrooklynTaskTags.getTagsFast(task)) { tasksWithTagCreating(tag).add(task); } - + tasksById.put(task.getId(), task); totalTaskCount.incrementAndGet(); } - + private static class TaskLookup implements Supplier<Task<?>> { // this class is not meant to be serialized, but if it is, make sure exec mgr doesn't sneak in transient BasicExecutionManager mgr; - + String id; String displayName; + public TaskLookup(BasicExecutionManager mgr, Task<?> t) { this.mgr = mgr; id = t.getId(); - if (mgr.getTask(id)==null) { - log.warn("Created task lookup for task which is not registered: "+t); + if (mgr.getTask(id) == null) { + log.warn("Created task lookup for task which is not registered: " + t); } displayName = t.getDisplayName(); } + @Override public Task<?> get() { - if (mgr==null) return gone(); + if (mgr == null) return gone(); Task<?> result = mgr.getTask(id); - if (result!=null) return result; + if (result != null) return result; return gone(); } + private <T> Task<T> gone() { - Task<T> t = Tasks.<T>builder().dynamic(false).displayName(displayName+" (placeholder for "+id+")") - .description("Details of the original task have been forgotten.") - .body(Callables.returning((T)null)).build(); + Task<T> t = Tasks.<T>builder().dynamic(false).displayName(displayName + " (placeholder for " + id + ")") + .description("Details of the original task have been forgotten.") + .body(Callables.returning((T) null)).build(); // don't really want anyone executing the "gone" task... // also if we are GC'ing tasks then cancelled may help with cleanup // of sub-tasks that have lost their submitted-by-task reference ? // also don't want warnings when it's finalized, this means we don't need ignoreIfNotRun() - ((BasicTask<T>)t).cancelled = true; + ((BasicTask<T>) t).cancelled = true; return t; } } - /** normally (if not interrupted) called once for each call to {@link #beforeSubmitScheduledTaskAllIterations(Map, Task)} */ - protected void beforeStartScheduledTaskAllIterations(Map<?,?> flags, ScheduledTask taskDoingTheInitialSchedule) { - taskDoingTheInitialSchedule.mdc = BrooklynTaskLoggingMdc.create(taskDoingTheInitialSchedule).start(); + /** + * normally (if not interrupted) called once for each call to {@link #beforeSubmitScheduledTaskAllIterations(Map, Task)} + */ + protected void beforeStartScheduledTaskAllIterations(Map<?, ?> flags, ScheduledTask taskDoingTheInitialSchedule) { + BrooklynTaskLoggingMdc.logStartEvent("Starting scheduled task iterations", taskDoingTheInitialSchedule, null); internalBeforeStart(flags, taskDoingTheInitialSchedule, !SCHEDULED_TASKS_COUNT_AS_ACTIVE, true, true); } - protected void beforeStartScheduledTaskSubmissionIteration(Map<?,?> flags, Task<?> taskDoingTheScheduling, Task<?> taskIteration) { + + protected void beforeStartScheduledTaskSubmissionIteration(Map<?, ?> flags, Task<?> taskDoingTheScheduling, Task<?> taskIteration) { // no-op, because handled as an atomic task // internalBeforeStart(flags, taskIteration, true, false); } - protected void beforeStartAtomicTask(Map<?,?> flags, Task<?> task) { + + protected void beforeStartAtomicTask(Map<?, ?> flags, Task<?> task) { internalBeforeStart(flags, task, false, true, false); } - protected void beforeStartInSameThreadTask(Map<?,?> flags, Task<?> task) { + + protected void beforeStartInSameThreadTask(Map<?, ?> flags, Task<?> task) { internalBeforeStart(flags, task, false, false, false); } - - /** invoked in a task's thread when a task is starting to run (may be some time after submitted), - * but before doing any of the task's work, so that we can update bookkeeping and notify callbacks */ - protected void internalBeforeStart(Map<?,?> flags, Task<?> task, boolean skipIncrementCounter, boolean allowJitter, boolean startingThisThreadMightEndElsewhere) { + + /** + * invoked in a task's thread when a task is starting to run (may be some time after submitted), + * but before doing any of the task's work, so that we can update bookkeeping and notify callbacks + */ + protected void internalBeforeStart(Map<?, ?> flags, Task<?> task, boolean skipIncrementCounter, boolean allowJitter, boolean startingThisThreadMightEndElsewhere) { int count = skipIncrementCounter ? activeTaskCount.get() : activeTaskCount.incrementAndGet(); - if (count % 1000==0 && count>0) { - log.warn("High number of active tasks: task #"+count+" is "+task); + if (count % 1000 == 0 && count > 0) { + log.warn("High number of active tasks: task #" + count + " is " + task); } - + //set thread _before_ start time, so we won't get a null thread when there is a start-time - if (log.isTraceEnabled()) log.trace(""+this+" beforeStart, task: "+task + " running on thread " + Thread.currentThread().getName()); + if (log.isTraceEnabled()) + log.trace("" + this + " beforeStart, task: " + task + " running on thread " + Thread.currentThread().getName()); if (!task.isCancelled()) { Thread thread = Thread.currentThread(); - ((TaskInternal<?>)task).setThread(thread); + ((TaskInternal<?>) task).setThread(thread); if (!startingThisThreadMightEndElsewhere) { if (RENAME_THREADS) { threadOriginalName.set(thread.getName()); @@ -1017,13 +1110,13 @@ public class BasicExecutionManager implements ExecutionManager { } PerThreadCurrentTaskHolder.perThreadCurrentTask.set(task); } - ((TaskInternal<?>)task).setStartTimeUtc(System.currentTimeMillis()); + ((TaskInternal<?>) task).setStartTimeUtc(System.currentTimeMillis()); } if (allowJitter) { jitterThreadStart(task); } - if (flags!=null && !startingThisThreadMightEndElsewhere) { + if (flags != null && !startingThisThreadMightEndElsewhere) { invokeCallback(flags.get("newTaskStartCallback"), task); } } @@ -1039,7 +1132,7 @@ public class BasicExecutionManager implements ExecutionManager { } } - @SuppressWarnings({ "unchecked", "rawtypes" }) + @SuppressWarnings({"unchecked", "rawtypes"}) // not ideal, such loose typing on the callback -- should prefer Function<Task,Object> // but at least it's package-private static Object invokeCallback(Object callable, Task<?> task) { @@ -1048,60 +1141,75 @@ public class BasicExecutionManager implements ExecutionManager { log.warn("Use of groovy.lang.Closure is deprecated, in ExecutionManager.invokeCallback"); loggedClosureDeprecatedInInvokeCallback = true; } - return ((Closure<?>)callable).call(task); + return ((Closure<?>) callable).call(task); } if (callable instanceof Callable) { try { - return ((Callable<?>)callable).call(); + return ((Callable<?>) callable).call(); } catch (Throwable t) { throw Exceptions.propagate(t); } } - if (callable instanceof Runnable) { ((Runnable)callable).run(); return null; } - if (callable instanceof Function) { return ((Function)callable).apply(task); } - if (callable==null) return null; - throw new IllegalArgumentException("Cannot invoke unexpected callback object "+callable+" of type "+callable.getClass()+" on "+task); + if (callable instanceof Runnable) { + ((Runnable) callable).run(); + return null; + } + if (callable instanceof Function) { + return ((Function) callable).apply(task); + } + if (callable == null) return null; + throw new IllegalArgumentException("Cannot invoke unexpected callback object " + callable + " of type " + callable.getClass() + " on " + task); } + private static boolean loggedClosureDeprecatedInInvokeCallback; - - /** normally (if not interrupted) called once for each call to {@link #beforeStartScheduledTaskAllIterations(Map, ScheduledTask)} */ - protected void afterEndScheduledTaskAllIterations(Map<?,?> flags, ScheduledTask taskDoingTheInitialSchedule, Throwable error) { + + /** + * normally (if not interrupted) called once for each call to {@link #beforeStartScheduledTaskAllIterations(Map, ScheduledTask)} + */ + protected void afterEndScheduledTaskAllIterations(Map<?, ?> flags, ScheduledTask taskDoingTheInitialSchedule, Throwable error) { boolean taskWasSubmittedAndNotYetEnded = true; try { taskWasSubmittedAndNotYetEnded = internalAfterEnd(flags, taskDoingTheInitialSchedule, !SCHEDULED_TASKS_COUNT_AS_ACTIVE, false, error); } finally { - if (taskDoingTheInitialSchedule.mdc!=null) { - taskDoingTheInitialSchedule.mdc.close(); - taskDoingTheInitialSchedule.mdc = null; + BrooklynTaskLoggingMdc.logEndEvent("Ending scheduled task iterations", taskDoingTheInitialSchedule); + synchronized (taskDoingTheInitialSchedule) { + taskDoingTheInitialSchedule.notifyAll(); } - synchronized (taskDoingTheInitialSchedule) { taskDoingTheInitialSchedule.notifyAll(); } if (taskWasSubmittedAndNotYetEnded) { // prevent from running twice on cancellation after start ((TaskInternal<?>) taskDoingTheInitialSchedule).runListeners(); } } } - /** called once for each call to {@link #beforeStartScheduledTaskSubmissionIteration(Map, Task, Task)}, - * with a per-iteration task generated by the surrounding scheduled task */ - protected void afterEndScheduledTaskSubmissionIteration(Map<?,?> flags, Task<?> taskDoingTheInitialSchedule, Task<?> taskIteration, Throwable error) { + + /** + * called once for each call to {@link #beforeStartScheduledTaskSubmissionIteration(Map, Task, Task)}, + * with a per-iteration task generated by the surrounding scheduled task + */ + protected void afterEndScheduledTaskSubmissionIteration(Map<?, ?> flags, Task<?> taskDoingTheInitialSchedule, Task<?> taskIteration, Throwable error) { // no-op because handled as an atomic task // internalAfterEnd(flags, taskIteration, false, true, error); } - /** called once for each task on which {@link #beforeStartAtomicTask(Map, Task)} is invoked, - * and normally (if not interrupted prior to start) - * called once for each task on which {@link #beforeSubmitAtomicTask(Map, Task)} */ - protected void afterEndAtomicTask(Map<?,?> flags, Task<?> task, Throwable error) { + + /** + * called once for each task on which {@link #beforeStartAtomicTask(Map, Task)} is invoked, + * and normally (if not interrupted prior to start) + * called once for each task on which {@link #beforeSubmitAtomicTask(Map, Task)} + */ + protected void afterEndAtomicTask(Map<?, ?> flags, Task<?> task, Throwable error) { internalAfterEnd(flags, task, false, true, error); } - protected void afterEndInSameThreadTask(Map<?,?> flags, Task<?> task, Throwable error) { + + protected void afterEndInSameThreadTask(Map<?, ?> flags, Task<?> task, Throwable error) { internalAfterEnd(flags, task, false, true, error); } - protected void afterEndForCancelBeforeStart(Map<?,?> flags, Task<?> task, boolean calledFromCanceller) { + + protected void afterEndForCancelBeforeStart(Map<?, ?> flags, Task<?> task, boolean calledFromCanceller) { if (calledFromCanceller) { if (task.isBegun()) { // do nothing from canceller thread if task has begin; // we don't want to set end time or listeners prematurely. - return ; + return; } else { // normally task won't be submitted by executor, so do some of the end operations. // there is a chance task has begun but not set start time, @@ -1113,15 +1221,17 @@ public class BasicExecutionManager implements ExecutionManager { } internalAfterEnd(flags, task, true, !calledFromCanceller, null); } - - /** normally (if not interrupted) called once for each call to {@link #internalBeforeSubmit(Map, Task)}, - * and, for atomic tasks and scheduled-task submission iterations where + + /** + * normally (if not interrupted) called once for each call to {@link #internalBeforeSubmit(Map, Task)}, + * and, for atomic tasks and scheduled-task submission iterations where * always called once if {@link #internalBeforeStart(Map, Task, boolean, boolean, boolean)} is invoked and if possible - * (but not possible for isEndingAllIterations) in the same thread as that method */ - protected boolean internalAfterEnd(Map<?,?> flags, Task<?> task, boolean skipDecrementCounter, boolean startedGuaranteedToEndInSameThreadAndEndingSameThread, Throwable error) { + * (but not possible for isEndingAllIterations) in the same thread as that method + */ + protected boolean internalAfterEnd(Map<?, ?> flags, Task<?> task, boolean skipDecrementCounter, boolean startedGuaranteedToEndInSameThreadAndEndingSameThread, Throwable error) { boolean taskWasSubmittedAndNotYetEnded = true; try { - if (log.isTraceEnabled()) log.trace(this+" afterEnd, task: "+task); + if (log.isTraceEnabled()) log.trace(this + " afterEnd, task: " + task); taskWasSubmittedAndNotYetEnded = incompleteTaskIds.remove(task.getId()); // this method might be called more than once, eg if cancelled, so use the above as a guard where single invocation is needed (eg counts) @@ -1129,13 +1239,13 @@ public class BasicExecutionManager implements ExecutionManager { activeTaskCount.decrementAndGet(); } - if (flags!=null && taskWasSubmittedAndNotYetEnded && startedGuaranteedToEndInSameThreadAndEndingSameThread) { + if (flags != null && taskWasSubmittedAndNotYetEnded && startedGuaranteedToEndInSameThreadAndEndingSameThread) { invokeCallback(flags.get("newTaskEndCallback"), task); } - if (task.getEndTimeUtc()>0) { + if (task.getEndTimeUtc() > 0) { if (taskWasSubmittedAndNotYetEnded) { // shouldn't happen - log.debug("Task "+task+" has end time "+task.getEndTimeUtc()+" but was marked as incomplete"); + log.debug("Task " + task + " has end time " + task.getEndTimeUtc() + " but was marked as incomplete"); } } else { ((TaskInternal<?>) task).setEndTimeUtc(System.currentTimeMillis()); @@ -1146,7 +1256,7 @@ public class BasicExecutionManager implements ExecutionManager { //clear thread _after_ endTime set, so we won't get a null thread when there is no end-time if (RENAME_THREADS) { Thread thread = task.getThread(); - if (thread==null) { + if (thread == null) { log.warn("BasicTask.afterEnd invoked without corresponding beforeStart"); } else { thread.setName(threadOriginalName.get()); @@ -1154,7 +1264,7 @@ public class BasicExecutionManager implements ExecutionManager { } } } - ((TaskInternal<?>)task).setThread(null); + ((TaskInternal<?>) task).setThread(null); // if uninteresting and transient and scheduled, go ahead and remove from task tags also, so it won't be reported as GC'd if (BrooklynTaskTags.isTransient(task) && UNINTERESTING_TASK_NAMES.contains(task.getDisplayName()) && task.getSubmittedByTask() instanceof ScheduledTask) { @@ -1163,7 +1273,7 @@ public class BasicExecutionManager implements ExecutionManager { } finally { try { - if (error!=null) { + if (error != null) { /* we throw, after logging debug. * the throw means the error is available for task submitters to monitor. * however it is possible no one is monitoring it, in which case we will have debug logging only for errors. @@ -1174,26 +1284,28 @@ public class BasicExecutionManager implements ExecutionManager { if (log.isDebugEnabled()) { // debug only here, because most submitters will handle failures if (error instanceof InterruptedException || error instanceof RuntimeInterruptedException) { - log.debug("Detected interruption on task "+task+" (rethrowing)" + - (Strings.isNonBlank(error.getMessage()) ? ": "+error.getMessage() : "")); + log.debug("Detected interruption on task " + task + " (rethrowing)" + + (Strings.isNonBlank(error.getMessage()) ? ": " + error.getMessage() : "")); } else if (error instanceof NullPointerException || error instanceof IndexOutOfBoundsException || error instanceof ClassCastException) { - log.debug("Exception running task "+task+" (rethrowing): "+error, error); + log.debug("Exception running task " + task + " (rethrowing): " + error, error); } else { - log.debug("Exception running task "+task+" (rethrowing): "+error); + log.debug("Exception running task " + task + " (rethrowing): " + error); } if (log.isTraceEnabled()) { - log.trace("Trace for exception running task "+task+" (rethrowing): "+error, error); + log.trace("Trace for exception running task " + task + " (rethrowing): " + error, error); } } throw Exceptions.propagate(error); } } finally { - synchronized (task) { task.notifyAll(); } + synchronized (task) { + task.notifyAll(); + } if (taskWasSubmittedAndNotYetEnded) { // prevent from running twice on cancellation after start - ((TaskInternal<?>)task).runListeners(); + ((TaskInternal<?>) task).runListeners(); } } } @@ -1203,22 +1315,22 @@ public class BasicExecutionManager implements ExecutionManager { public TaskScheduler getTaskSchedulerForTag(Object tag) { return schedulerByTag.get(tag); } - + public void setTaskSchedulerForTag(Object tag, Class<? extends TaskScheduler> scheduler) { synchronized (schedulerByTag) { TaskScheduler old = getTaskSchedulerForTag(tag); - if (old!=null) { + if (old != null) { if (scheduler.isAssignableFrom(old.getClass())) { /* already have such an instance */ return; } //might support multiple in future... - throw new IllegalStateException("Not allowed to set multiple TaskSchedulers on ExecutionManager tag (tag "+tag+", has "+old+", setting new "+scheduler+")"); + throw new IllegalStateException("Not allowed to set multiple TaskSchedulers on ExecutionManager tag (tag " + tag + ", has " + old + ", setting new " + scheduler + ")"); } try { TaskScheduler schedulerI = scheduler.newInstance(); // allow scheduler to have a nice name, for logging etc - if (schedulerI instanceof CanSetName) ((CanSetName)schedulerI).setName(""+tag); + if (schedulerI instanceof CanSetName) ((CanSetName) schedulerI).setName("" + tag); setTaskSchedulerForTag(tag, schedulerI); } catch (InstantiationException e) { throw Exceptions.propagate(e); @@ -1227,10 +1339,10 @@ public class BasicExecutionManager implements ExecutionManager { } } } - + /** * Defines a {@link TaskScheduler} to run on all subsequently submitted jobs with the given tag. - * + * <p> * Maximum of one allowed currently. Resubmissions of the same scheduler (or scheduler class) * allowed. If changing, you must call {@link #clearTaskSchedulerForTag(Object)} between the two. * @@ -1241,9 +1353,9 @@ public class BasicExecutionManager implements ExecutionManager { scheduler.injectExecutor(runner); Object old = schedulerByTag.put(tag, scheduler); - if (old!=null && old!=scheduler) { + if (old != null && old != scheduler) { //might support multiple in future... - throw new IllegalStateException("Not allowed to set multiple TaskSchedulers on ExecutionManager tag (tag "+tag+")"); + throw new IllegalStateException("Not allowed to set multiple TaskSchedulers on ExecutionManager tag (tag " + tag + ")"); } } } @@ -1257,10 +1369,10 @@ public class BasicExecutionManager implements ExecutionManager { public boolean clearTaskSchedulerForTag(Object tag) { synchronized (schedulerByTag) { Object old = schedulerByTag.remove(tag); - return (old!=null); + return (old != null); } } - + @VisibleForTesting public ConcurrentMap<Object, TaskScheduler> getSchedulerByTag() { return schedulerByTag; diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/ScheduledTask.java b/core/src/main/java/org/apache/brooklyn/util/core/task/ScheduledTask.java index 4422209..b6a7029 100644 --- a/core/src/main/java/org/apache/brooklyn/util/core/task/ScheduledTask.java +++ b/core/src/main/java/org/apache/brooklyn/util/core/task/ScheduledTask.java @@ -18,7 +18,10 @@ */ package org.apache.brooklyn.util.core.task; +import org.apache.brooklyn.api.internal.BrooklynLoggingCategories; import org.apache.brooklyn.api.mgmt.ExecutionContext; +import org.apache.brooklyn.core.BrooklynLogging; +import org.apache.brooklyn.core.BrooklynLogging.LoggingLevel; import static org.apache.brooklyn.util.JavaGroovyEquivalents.elvis; import static org.apache.brooklyn.util.JavaGroovyEquivalents.groovyTruth; @@ -53,8 +56,6 @@ public class ScheduledTask extends BasicTask<Object> { final Callable<Task<?>> taskFactory; - protected BrooklynTaskLoggingMdc mdc; - /** * Initial delay before running, set as flag in constructor; defaults to 0 */ @@ -266,6 +267,7 @@ public class ScheduledTask extends BasicTask<Object> { @Override protected boolean doCancel(org.apache.brooklyn.util.core.task.TaskInternal.TaskCancellationMode mode) { + BrooklynLogging.log(BrooklynLoggingCategories.TASK_LIFECYCLE_LOG, LoggingLevel.DEBUG, "Cancelling scheduled task "+this); if (nextRun!=null) { ((TaskInternal<?>)nextRun).cancel(mode); try { diff --git a/utils/common/src/main/java/org/apache/brooklyn/util/exceptions/RuntimeInterruptedException.java b/utils/common/src/main/java/org/apache/brooklyn/util/exceptions/RuntimeInterruptedException.java index c0cde50..f207001 100644 --- a/utils/common/src/main/java/org/apache/brooklyn/util/exceptions/RuntimeInterruptedException.java +++ b/utils/common/src/main/java/org/apache/brooklyn/util/exceptions/RuntimeInterruptedException.java @@ -33,6 +33,11 @@ public class RuntimeInterruptedException extends RuntimeException { private static final long serialVersionUID = 915050245927866175L; + public RuntimeInterruptedException(String msg) { + super(msg); + Thread.currentThread().interrupt(); + } + public RuntimeInterruptedException(InterruptedException cause) { super(cause); Thread.currentThread().interrupt();
