This is an automated email from the ASF dual-hosted git repository.

heneveld pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git

commit b760aead06eea6f2a0e395497a4de48a5fc722f8
Author: Alex Heneveld <[email protected]>
AuthorDate: Tue Sep 14 17:03:28 2021 +0100

    fix race where persister might still be reading from hot standby
    
    doesn't affect behaviour, just prevents errors.
    also improve scheduled task logging.
---
 .../rebind/mementos/BrooklynMementoPersister.java  |   2 +
 .../core/mgmt/ha/HighAvailabilityManagerImpl.java  |   5 +
 .../BrooklynMementoPersisterToObjectStore.java     |  47 +-
 .../core/mgmt/rebind/RebindManagerImpl.java        |   7 +
 .../util/core/task/BasicExecutionManager.java      | 682 ++++++++++++---------
 .../brooklyn/util/core/task/ScheduledTask.java     |   6 +-
 .../exceptions/RuntimeInterruptedException.java    |   5 +
 7 files changed, 456 insertions(+), 298 deletions(-)

diff --git 
a/api/src/main/java/org/apache/brooklyn/api/mgmt/rebind/mementos/BrooklynMementoPersister.java
 
b/api/src/main/java/org/apache/brooklyn/api/mgmt/rebind/mementos/BrooklynMementoPersister.java
index 2090c80..163a0ff 100644
--- 
a/api/src/main/java/org/apache/brooklyn/api/mgmt/rebind/mementos/BrooklynMementoPersister.java
+++ 
b/api/src/main/java/org/apache/brooklyn/api/mgmt/rebind/mementos/BrooklynMementoPersister.java
@@ -109,6 +109,8 @@ public interface BrooklynMementoPersister {
     void disableWriteAccess(boolean graceful);
     /** permanently shuts down all access to the remote store */
     void stop(boolean graceful);
+    /** cancels/waits for all current tasks */
+    void reset();
 
     @VisibleForTesting
     void waitForWritesCompleted(Duration timeout) throws InterruptedException, 
TimeoutException;
diff --git 
a/core/src/main/java/org/apache/brooklyn/core/mgmt/ha/HighAvailabilityManagerImpl.java
 
b/core/src/main/java/org/apache/brooklyn/core/mgmt/ha/HighAvailabilityManagerImpl.java
index 8c63775..9fa2ead 100644
--- 
a/core/src/main/java/org/apache/brooklyn/core/mgmt/ha/HighAvailabilityManagerImpl.java
+++ 
b/core/src/main/java/org/apache/brooklyn/core/mgmt/ha/HighAvailabilityManagerImpl.java
@@ -891,6 +891,11 @@ public class HighAvailabilityManagerImpl implements 
HighAvailabilityManager {
 
     protected void promoteToMaster() {
         LOG.debug("Promoting to master: "+this);
+        if (Tasks.current()!=null) {
+            // let us check if promotion is happening in the right context
+            Task task = Tasks.current();
+            LOG.debug("Task context for master promotion: "+task+" 
("+task.getTags()+"); "+task.getStatusSummary());
+        }
         if (!running) {
             LOG.warn("Ignoring promote-to-master request, as 
HighAvailabilityManager is not running");
             return;
diff --git 
a/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/BrooklynMementoPersisterToObjectStore.java
 
b/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/BrooklynMementoPersisterToObjectStore.java
index 5f16faf..e3b3540 100644
--- 
a/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/BrooklynMementoPersisterToObjectStore.java
+++ 
b/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/BrooklynMementoPersisterToObjectStore.java
@@ -43,6 +43,7 @@ import 
org.apache.brooklyn.api.mgmt.rebind.RebindExceptionHandler;
 import org.apache.brooklyn.api.mgmt.rebind.mementos.BrooklynMemento;
 import org.apache.brooklyn.api.mgmt.rebind.mementos.BrooklynMementoManifest;
 import org.apache.brooklyn.api.mgmt.rebind.mementos.BrooklynMementoPersister;
+import 
org.apache.brooklyn.api.mgmt.rebind.mementos.BrooklynMementoPersister.LookupContext;
 import org.apache.brooklyn.api.mgmt.rebind.mementos.BrooklynMementoRawData;
 import org.apache.brooklyn.api.mgmt.rebind.mementos.CatalogItemMemento;
 import org.apache.brooklyn.api.mgmt.rebind.mementos.ManagedBundleMemento;
@@ -71,6 +72,7 @@ import org.apache.brooklyn.util.collections.MutableSet;
 import org.apache.brooklyn.util.core.xstream.XmlUtil;
 import org.apache.brooklyn.util.exceptions.CompoundRuntimeException;
 import org.apache.brooklyn.util.exceptions.Exceptions;
+import org.apache.brooklyn.util.exceptions.RuntimeInterruptedException;
 import org.apache.brooklyn.util.text.Strings;
 import org.apache.brooklyn.util.time.Duration;
 import org.apache.brooklyn.util.time.Time;
@@ -117,7 +119,7 @@ public class BrooklynMementoPersisterToObjectStore 
implements BrooklynMementoPer
 
     private final Map<String, StoreObjectAccessorWithLock> writers = new 
LinkedHashMap<String, PersistenceObjectStore.StoreObjectAccessorWithLock>();
 
-    private final ListeningExecutorService executor;
+    private ListeningExecutorService executor;
 
     private volatile boolean writesAllowed = false;
     private volatile boolean writesShuttingDown = false;
@@ -154,8 +156,6 @@ public class BrooklynMementoPersisterToObjectStore 
implements BrooklynMementoPer
                 .withClassLoader(classLoader).build();
         this.serializerWithStandardClassLoader = new 
RetryingMementoSerializer<Object>(rawSerializer, maxSerializationAttempts);
 
-        int maxThreadPoolSize = 
brooklynProperties.getConfig(PERSISTER_MAX_THREAD_POOL_SIZE);
-
         objectStore.createSubPath("entities");
         objectStore.createSubPath("locations");
         objectStore.createSubPath("policies");
@@ -166,11 +166,11 @@ public class BrooklynMementoPersisterToObjectStore 
implements BrooklynMementoPer
         // FIXME does it belong here or to 
ManagementPlaneSyncRecordPersisterToObjectStore ?
         objectStore.createSubPath("plane");
         
-        executor = 
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(maxThreadPoolSize,
 new ThreadFactory() {
-            @Override public Thread newThread(Runnable r) {
-                // Note: Thread name referenced in logback-includes' 
ThreadNameDiscriminator
-                return new Thread(r, "brooklyn-persister");
-            }}));
+        resetExecutor();
+    }
+
+    private Integer maxThreadPoolSize() {
+        return brooklynProperties.getConfig(PERSISTER_MAX_THREAD_POOL_SIZE);
     }
 
     public MementoSerializer<Object> getMementoSerializer() {
@@ -210,8 +210,14 @@ public class BrooklynMementoPersisterToObjectStore 
implements BrooklynMementoPer
         final ManagementContext managementContext = 
lookupContext.lookupManagementContext();
         RegisteredType catalogItem = 
managementContext.getTypeRegistry().get(catalogItemId);
         if (catalogItem == null) {
-            // TODO do we need to only log once, rather than risk log.warn too 
often? I think this only happens on rebind, so ok.
-            LOG.warn("Unable to load catalog item "+catalogItemId
+            // will happen on rebind if our execution should have ended
+            if (Thread.interrupted()) {
+                LOG.debug("Aborting (probably old) rebind iteration");
+                throw new RuntimeInterruptedException("Rebind iteration 
cancelled");
+            }
+
+            // might come here for other reasons too
+            LOG.debug("Unable to load registered type "+catalogItemId
                 +" for custom class loader of " + type + " " + objectId + "; 
will use default class loader");
             return null;
         } else {
@@ -245,7 +251,24 @@ public class BrooklynMementoPersisterToObjectStore 
implements BrooklynMementoPer
     @Override 
     public void stop(boolean graceful) {
         disableWriteAccess(graceful);
-        
+        stopExecutor(graceful);
+    }
+
+    @Override
+    public void reset() {
+        resetExecutor();
+    }
+
+    public void resetExecutor() {
+        stopExecutor(false);
+        executor = 
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(maxThreadPoolSize(),
 new ThreadFactory() {
+            @Override public Thread newThread(Runnable r) {
+                // Note: Thread name referenced in logback-includes' 
ThreadNameDiscriminator
+                return new Thread(r, "brooklyn-persister");
+            }}));
+    }
+
+    public void stopExecutor(boolean graceful) {
         if (executor != null) {
             if (graceful) {
                 executor.shutdown();
@@ -473,6 +496,8 @@ public class BrooklynMementoPersisterToObjectStore 
implements BrooklynMementoPer
     
     @Override
     public BrooklynMemento loadMemento(BrooklynMementoRawData mementoData, 
final LookupContext lookupContext, final RebindExceptionHandler 
exceptionHandler) throws IOException {
+        LOG.debug("Loading mementos");
+
         if (mementoData==null)
             mementoData = loadMementoRawData(exceptionHandler);
 
diff --git 
a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java
 
b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java
index 899e10b..c45c96f 100644
--- 
a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java
+++ 
b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java
@@ -396,6 +396,9 @@ public class RebindManagerImpl implements RebindManager {
                 LOG.warn("Rebind (read-only) tasks took too long to die after 
interrupt (ignoring): "+readOnlyTask);
             }
             readOnlyTask = null;
+            if (persistenceStoreAccess!=null) {
+                persistenceStoreAccess.reset();
+            }
             LOG.debug("Stopped read-only rebinding ("+this+"), mgmt 
"+managementContext.getManagementNodeId());
 
             // short waits when promoting
@@ -404,6 +407,7 @@ public class RebindManagerImpl implements RebindManager {
                     Duration.seconds(5));
             // note, items are subsequently unmanaged via:
             // HighAvailabilityManagerImpl.clearManagedItems
+            readOnlyRebindCount.set(0);
         }
     }
 
@@ -467,6 +471,9 @@ public class RebindManagerImpl implements RebindManager {
     @Override
     public void reset() {
         if (persistenceRealChangeListener != null && 
!persistenceRealChangeListener.isActive()) 
persistenceRealChangeListener.reset();
+        if (persistenceStoreAccess!=null) {
+            persistenceStoreAccess.reset();
+        }
     }
     
     @Override
diff --git 
a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
 
b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
index 41a704e..88812e0 100644
--- 
a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
+++ 
b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
@@ -59,6 +59,7 @@ import org.apache.brooklyn.api.mgmt.TaskAdaptable;
 import org.apache.brooklyn.api.mgmt.entitlement.EntitlementContext;
 import org.apache.brooklyn.core.BrooklynFeatureEnablement;
 import org.apache.brooklyn.core.BrooklynLogging;
+import org.apache.brooklyn.core.BrooklynLogging.LoggingLevel;
 import org.apache.brooklyn.core.config.Sanitizer;
 import org.apache.brooklyn.core.entity.Entities;
 import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
@@ -133,16 +134,18 @@ public class BasicExecutionManager implements 
ExecutionManager {
         public static BrooklynTaskLoggingMdc create() {
             return new BrooklynTaskLoggingMdc();
         }
+
         public static BrooklynTaskLoggingMdc create(Task task) {
             return new BrooklynTaskLoggingMdc().withTask(task);
         }
 
         boolean isRedundant = false;
         Task task;
-        MDC.MDCCloseable taskMdc=null, entityMdc=null;
+        MDC.MDCCloseable taskMdc = null, entityMdc = null;
         String prevTaskMdc, prevEntityMdc;
 
-        private BrooklynTaskLoggingMdc() {}
+        private BrooklynTaskLoggingMdc() {
+        }
 
         public BrooklynTaskLoggingMdc withTask(Task task) {
             this.task = task;
@@ -156,7 +159,7 @@ public class BasicExecutionManager implements 
ExecutionManager {
             // the log _message_ shows who submitted it, which is more 
reliable than
             // the prevTaskMdc as we might be in a different thread and/or the 
prevTaskMdc
             // can misleadingly point point to the task which triggered the 
executor
-            if (task!=null) {
+            if (task != null) {
                 prevTaskMdc = MDC.get(LOGGING_MDC_KEY_TASK_ID);
                 if (Objects.equals(task.getId(), prevTaskMdc)) {
                     isRedundant = true;
@@ -172,16 +175,20 @@ public class BasicExecutionManager implements 
ExecutionManager {
                 entityMdc = MDC.putCloseable(LOGGING_MDC_KEY_ENTITY_IDS, "");
             }
 
-            logEvent("Starting task", task, entity);
+            logStartEvent("Starting task", task, entity);
 
             return this;
         }
 
-        public static void logEvent(String prefix, Task task, Entity entity) {
+        public static void logStartEvent(String prefix, Task task, Entity 
entity) {
             if (BrooklynLoggingCategories.TASK_LIFECYCLE_LOG.isDebugEnabled() 
|| BrooklynLoggingCategories.TASK_LIFECYCLE_LOG.isTraceEnabled()) {
+                if (entity==null) {
+                    entity = 
BrooklynTaskTags.getTargetOrContextEntity(Tasks.current());
+                }
+
                 String taskName = task.getDisplayName();
                 String message = prefix + " " + task.getId() +
-                        (Strings.isNonBlank(taskName) ? " ("+taskName+")" : 
"") +
+                        (Strings.isNonBlank(taskName) ? " (" + taskName + ")" 
: "") +
                         (entity == null ? "" : " on entity " + entity.getId()) 
+
                         (Strings.isNonBlank(task.getSubmittedByTaskId()) ? " 
from task " + task.getSubmittedByTaskId() : "") +
                         Entitlements.getEntitlementContextUserMaybe().
@@ -197,24 +204,29 @@ public class BasicExecutionManager implements 
ExecutionManager {
             }
         }
 
-        public void finish() {
-            if (isRedundant) {
-                return;
-            }
-            if (BrooklynLoggingCategories.TASK_LIFECYCLE_LOG.isDebugEnabled() 
|| BrooklynLoggingCategories.TASK_LIFECYCLE_LOG.isTraceEnabled()){
+        public static void logEndEvent(String prefix, Task task) {
+            if (BrooklynLoggingCategories.TASK_LIFECYCLE_LOG.isDebugEnabled() 
|| BrooklynLoggingCategories.TASK_LIFECYCLE_LOG.isTraceEnabled()) {
                 String taskName = task.getDisplayName();
                 
BrooklynLogging.log(BrooklynLoggingCategories.TASK_LIFECYCLE_LOG,
                         UNINTERESTING_TASK_NAMES.contains(taskName) ? 
BrooklynLogging.LoggingLevel.TRACE : BrooklynLogging.LoggingLevel.DEBUG,
-                        "Ending task {}", task.getId());
+                        prefix + " " + task.getId());
+            }
+        }
+
+        public void finish() {
+            if (isRedundant) {
+                return;
             }
+            // not normally used for scheduled tasks
+            logEndEvent(task instanceof ScheduledTask ? "Ending scheduled task 
context" : "Ending task", task);
             if (entityMdc != null) {
                 entityMdc.close();
-                if (prevEntityMdc!=null) MDC.put(LOGGING_MDC_KEY_ENTITY_IDS, 
prevEntityMdc);
+                if (prevEntityMdc != null) MDC.put(LOGGING_MDC_KEY_ENTITY_IDS, 
prevEntityMdc);
                 prevEntityMdc = null;
             }
             if (taskMdc != null) {
                 taskMdc.close();
-                if (prevTaskMdc!=null) MDC.put(LOGGING_MDC_KEY_TASK_ID, 
prevTaskMdc);
+                if (prevTaskMdc != null) MDC.put(LOGGING_MDC_KEY_TASK_ID, 
prevTaskMdc);
                 prevTaskMdc = null;
             }
         }
@@ -224,131 +236,141 @@ public class BasicExecutionManager implements 
ExecutionManager {
         }
     }
 
-    public static void registerUninterestingTaskName(String taskName){
-        registerUninterestingTaskName(taskName,false);
+    public static void registerUninterestingTaskName(String taskName) {
+        registerUninterestingTaskName(taskName, false);
     }
-    public static void registerUninterestingTaskName(String taskName, boolean 
registerScheduledPrefix){
-        log.debug("Registering '{}' as UninterestingTaskName. Starting 
finishing trace will be log as trace",taskName);
+
+    public static void registerUninterestingTaskName(String taskName, boolean 
registerScheduledPrefix) {
+        log.debug("Registering '{}' as UninterestingTaskName. Starting 
finishing trace will be log as trace", taskName);
         UNINTERESTING_TASK_NAMES.add(taskName);
-        if(registerScheduledPrefix) {
+        if (registerScheduledPrefix) {
             
UNINTERESTING_TASK_NAMES.add(ScheduledTask.prefixScheduledName(taskName));
         }
     }
 
     private final ThreadFactory threadFactory;
-    
+
     private final ThreadFactory daemonThreadFactory;
-    
+
     private final ExecutorService runner;
-        
+
     private final ScheduledExecutorService delayedRunner;
 
     // inefficient having so many records, and also doing searches through ...
     // many things in here could be more efficient however (different types of 
lookup etc),
     // do that when we need to.
-    
+
     //access to this field AND to members in this field is synchronized, 
     //to allow us to preserve order while guaranteeing thread-safe
     //(but more testing is needed before we are completely sure it is 
thread-safe!)
     //synch blocks are as finely grained as possible for efficiency;
     //NB CopyOnWriteArraySet is a perf bottleneck, and the simple map makes it 
easier to remove when a tag is empty
-    private Map<Object,Set<Task<?>>> tasksByTag = new 
HashMap<Object,Set<Task<?>>>();
-    
-    private ConcurrentMap<String,Task<?>> tasksById = new 
ConcurrentHashMap<String,Task<?>>();
+    private Map<Object, Set<Task<?>>> tasksByTag = new HashMap<Object, 
Set<Task<?>>>();
+
+    private ConcurrentMap<String, Task<?>> tasksById = new 
ConcurrentHashMap<String, Task<?>>();
 
     private ConcurrentMap<Object, TaskScheduler> schedulerByTag = new 
ConcurrentHashMap<Object, TaskScheduler>();
 
-    /** count of all tasks submitted, including finished */
+    /**
+     * count of all tasks submitted, including finished
+     */
     private final AtomicLong totalTaskCount = new AtomicLong();
-    
-    /** tasks submitted but not yet done (or in cases of 
interruption/cancelled not yet GC'd) */
+
+    /**
+     * tasks submitted but not yet done (or in cases of interruption/cancelled 
not yet GC'd)
+     */
     private Set<String> incompleteTaskIds = Sets.newConcurrentHashSet();
-    
-    /** tasks started but not yet finished */
+
+    /**
+     * tasks started but not yet finished
+     */
     private final AtomicInteger activeTaskCount = new AtomicInteger();
-    
+
     private final List<ExecutionListener> listeners = new 
CopyOnWriteArrayList<ExecutionListener>();
-    
+
     private final static ThreadLocal<String> threadOriginalName = new 
ThreadLocal<String>() {
         @Override
         protected String initialValue() {
             // should not happen, as only access is in _afterEnd with a check 
that _beforeStart was invoked 
-            log.warn("No original name recorded for thread 
"+Thread.currentThread().getName()+"; task "+Tasks.current());
-            return "brooklyn-thread-pool-"+Identifiers.makeRandomId(8);
+            log.warn("No original name recorded for thread " + 
Thread.currentThread().getName() + "; task " + Tasks.current());
+            return "brooklyn-thread-pool-" + Identifiers.makeRandomId(8);
         }
     };
-    
+
     public BasicExecutionManager(String contextid) {
         threadFactory = newThreadFactory(contextid);
         daemonThreadFactory = new ThreadFactoryBuilder()
                 .setThreadFactory(threadFactory)
                 .setDaemon(true)
                 .build();
-                
+
         // use Executors.newCachedThreadPool(daemonThreadFactory), but timeout 
of 1s rather than 60s for better shutdown!
-        runner = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10L, 
TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), 
+        runner = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10L, 
TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
                 daemonThreadFactory);
-            
+
         delayedRunner = new ScheduledThreadPoolExecutor(1, 
daemonThreadFactory);
 
         if (jitterThreads) {
             log.info("Task startup jittering enabled with a maximum of " + 
jitterThreadsMaxDelay + " delay.");
         }
     }
-    
+
     private final static class UncaughtExceptionHandlerImplementation 
implements Thread.UncaughtExceptionHandler {
         @Override
         public void uncaughtException(Thread t, Throwable e) {
-            log.error("Uncaught exception in thread "+t.getName(), e);
+            log.error("Uncaught exception in thread " + t.getName(), e);
         }
     }
-    
-    /** 
+
+    /**
      * For use by overriders to use custom thread factory.
      * But be extremely careful: called by constructor, so before sub-class' 
constructor will
      * have been invoked!
      */
     protected ThreadFactory newThreadFactory(String contextid) {
         return new ThreadFactoryBuilder()
-                .setNameFormat("brooklyn-execmanager-"+contextid+"-%d")
+                .setNameFormat("brooklyn-execmanager-" + contextid + "-%d")
                 .setUncaughtExceptionHandler(new 
UncaughtExceptionHandlerImplementation())
                 .build();
     }
-    
+
     public void shutdownNow() {
         shutdownNow(null);
     }
-    
-    /** shuts down the executor, and if a duration is supplied awaits 
termination for that long.
+
+    /**
+     * shuts down the executor, and if a duration is supplied awaits 
termination for that long.
+     *
      * @return whether everything is terminated
      */
     @Beta
     public boolean shutdownNow(Duration howLongToWaitForTermination) {
         runner.shutdownNow();
         delayedRunner.shutdownNow();
-        if (howLongToWaitForTermination!=null) {
+        if (howLongToWaitForTermination != null) {
             CountdownTimer timer = 
howLongToWaitForTermination.countdownTimer();
             try {
                 
runner.awaitTermination(timer.getDurationRemaining().toMilliseconds(), 
TimeUnit.MILLISECONDS);
-                if (timer.isLive()) 
delayedRunner.awaitTermination(timer.getDurationRemaining().toMilliseconds(), 
TimeUnit.MILLISECONDS);
+                if (timer.isLive())
+                    
delayedRunner.awaitTermination(timer.getDurationRemaining().toMilliseconds(), 
TimeUnit.MILLISECONDS);
             } catch (InterruptedException e) {
                 throw Exceptions.propagate(e);
             }
         }
         return runner.isTerminated() && delayedRunner.isTerminated();
     }
-    
+
     public void addListener(ExecutionListener listener) {
         listeners.add(listener);
     }
-    
+
     public void removeListener(ExecutionListener listener) {
         listeners.remove(listener);
     }
-    
+
     /**
      * Deletes the given tag, including all tasks using this tag.
-     * 
+     * <p>
      * Useful, for example, if an entity is being expunged so that we don't 
keep holding
      * a reference to it as a tag.
      */
@@ -367,9 +389,9 @@ public class BasicExecutionManager implements 
ExecutionManager {
     public void deleteTask(Task<?> task) {
         boolean removed = deleteTaskNonRecursive(task);
         if (!removed) return;
-        
+
         if (task instanceof HasTaskChildren) {
-            List<Task<?>> children = 
ImmutableList.copyOf(((HasTaskChildren)task).getChildren());
+            List<Task<?>> children = ImmutableList.copyOf(((HasTaskChildren) 
task).getChildren());
             for (Task<?> child : children) {
                 deleteTask(child);
             }
@@ -391,10 +413,10 @@ public class BasicExecutionManager implements 
ExecutionManager {
         }
         Task<?> removed = tasksById.remove(task.getId());
         incompleteTaskIds.remove(task.getId());
-        if (removed!=null && removed.isSubmitted() && !removed.isDone(true)) {
+        if (removed != null && removed.isSubmitted() && !removed.isDone(true)) 
{
             Entity context = BrooklynTaskTags.getContextEntity(removed);
-            if (context!=null && !Entities.isManaged(context)) {
-                log.debug("Deleting active task on unmanagement of 
"+context+": "+removed);
+            if (context != null && !Entities.isManaged(context)) {
+                log.debug("Deleting active task on unmanagement of " + context 
+ ": " + removed);
             } else {
                 if (removed.isDone()) {
                     log.debug("Deleting cancelled task before completion: " + 
removed + "; this task will continue to run in the background outwith " + this);
@@ -419,23 +441,31 @@ public class BasicExecutionManager implements 
ExecutionManager {
     public boolean isShutdown() {
         return runner.isShutdown();
     }
-    
-    /** count of all tasks submitted */
+
+    /**
+     * count of all tasks submitted
+     */
     public long getTotalTasksSubmitted() {
         return totalTaskCount.get();
     }
-    
-    /** count of tasks submitted but not ended */
+
+    /**
+     * count of tasks submitted but not ended
+     */
     public long getNumIncompleteTasks() {
         return incompleteTaskIds.size();
     }
-    
-    /** count of tasks started but not ended */
+
+    /**
+     * count of tasks started but not ended
+     */
     public long getNumActiveTasks() {
         return activeTaskCount.get();
     }
 
-    /** count of tasks kept in memory, often including ended tasks */
+    /**
+     * count of tasks kept in memory, often including ended tasks
+     */
     public long getNumInMemoryTasks() {
         return tasksById.size();
     }
@@ -444,7 +474,7 @@ public class BasicExecutionManager implements 
ExecutionManager {
         Preconditions.checkNotNull(tag);
         synchronized (tasksByTag) {
             Set<Task<?>> result = tasksWithTagLiveOrNull(tag);
-            if (result==null) {
+            if (result == null) {
                 result = Collections.synchronizedSet(new 
LinkedHashSet<Task<?>>());
                 tasksByTag.put(tag, result);
             }
@@ -452,7 +482,9 @@ public class BasicExecutionManager implements 
ExecutionManager {
         }
     }
 
-    /** exposes live view, for internal use only */
+    /**
+     * exposes live view, for internal use only
+     */
     @Beta
     public Set<Task<?>> tasksWithTagLiveOrNull(Object tag) {
         synchronized (tasksByTag) {
@@ -464,8 +496,10 @@ public class BasicExecutionManager implements 
ExecutionManager {
     public Task<?> getTask(String id) {
         return tasksById.get(id);
     }
-    
-    /** not on interface because potentially expensive */
+
+    /**
+     * not on interface because potentially expensive
+     */
     public List<Task<?>> getAllTasks() {
         // not sure if synching makes any difference; have not observed CME's 
yet
         // (and so far this is only called when a CME was caught on a previous 
operation)
@@ -473,23 +507,23 @@ public class BasicExecutionManager implements 
ExecutionManager {
             return MutableList.copyOf(tasksById.values());
         }
     }
-    
+
     @Override
     public Set<Task<?>> getTasksWithTag(Object tag) {
         Set<Task<?>> result = tasksWithTagLiveOrNull(tag);
-        if (result==null) return Collections.emptySet();
+        if (result == null) return Collections.emptySet();
         synchronized (result) {
             return Collections.unmodifiableSet(new 
LinkedHashSet<Task<?>>(result));
         }
     }
-    
+
     @Override
     public Set<Task<?>> getTasksWithAnyTag(Iterable<?> tags) {
         Set<Task<?>> result = new LinkedHashSet<Task<?>>();
         Iterator<?> ti = tags.iterator();
         while (ti.hasNext()) {
             Set<Task<?>> tasksForTag = tasksWithTagLiveOrNull(ti.next());
-            if (tasksForTag!=null) {
+            if (tasksForTag != null) {
                 synchronized (tasksForTag) {
                     result.addAll(tasksForTag);
                 }
@@ -498,7 +532,9 @@ public class BasicExecutionManager implements 
ExecutionManager {
         return Collections.unmodifiableSet(result);
     }
 
-    /** only works with at least one tag; returns empty if no tags */
+    /**
+     * only works with at least one tag; returns empty if no tags
+     */
     @Override
     public Set<Task<?>> getTasksWithAllTags(Iterable<?> tags) {
         //NB: for this method retrieval for multiple tags could be made (much) 
more efficient (if/when it is used with multiple tags!)
@@ -509,7 +545,7 @@ public class BasicExecutionManager implements 
ExecutionManager {
         Iterator<?> ti = tags.iterator();
         while (ti.hasNext()) {
             Object tag = ti.next();
-            if (first) { 
+            if (first) {
                 first = false;
                 result.addAll(getTasksWithTag(tag));
             } else {
@@ -519,49 +555,83 @@ public class BasicExecutionManager implements 
ExecutionManager {
         return Collections.unmodifiableSet(result);
     }
 
-    /** live view of all tasks, for internal use only */
+    /**
+     * live view of all tasks, for internal use only
+     */
     @Beta
-    public Collection<Task<?>> allTasksLive() { return tasksById.values(); }
-    
+    public Collection<Task<?>> allTasksLive() {
+        return tasksById.values();
+    }
+
     @Override
-    public Set<Object> getTaskTags() { 
+    public Set<Object> getTaskTags() {
         synchronized (tasksByTag) {
-            return 
Collections.unmodifiableSet(Sets.newLinkedHashSet(tasksByTag.keySet())); 
+            return 
Collections.unmodifiableSet(Sets.newLinkedHashSet(tasksByTag.keySet()));
         }
     }
 
-    @Override @Deprecated public Task<?> submit(Runnable r) { return 
submit(new LinkedHashMap<Object,Object>(1), r); }
-    @Override public Task<?> submit(String displayName, Runnable r) { return 
submit(MutableMap.of("displayName", displayName), r); }
-    @Override public Task<?> submit(Map<?,?> flags, Runnable r) { return 
submit(flags, new BasicTask<Void>(flags, r)); }
+    @Override
+    @Deprecated
+    public Task<?> submit(Runnable r) {
+        return submit(new LinkedHashMap<Object, Object>(1), r);
+    }
+
+    @Override
+    public Task<?> submit(String displayName, Runnable r) {
+        return submit(MutableMap.of("displayName", displayName), r);
+    }
+
+    @Override
+    public Task<?> submit(Map<?, ?> flags, Runnable r) {
+        return submit(flags, new BasicTask<Void>(flags, r));
+    }
+
+    @Override
+    @Deprecated
+    public <T> Task<T> submit(Callable<T> c) {
+        return submit(new LinkedHashMap<Object, Object>(1), c);
+    }
+
+    @Override
+    public <T> Task<T> submit(String displayName, Callable<T> c) {
+        return submit(MutableMap.of("displayName", displayName), c);
+    }
 
-    @Override @Deprecated public <T> Task<T> submit(Callable<T> c) { return 
submit(new LinkedHashMap<Object,Object>(1), c); }
-    @Override public <T> Task<T> submit(String displayName, Callable<T> c) { 
return submit(MutableMap.of("displayName", displayName), c); }
-    @Override public <T> Task<T> submit(Map<?,?> flags, Callable<T> c) { 
return submit(flags, new BasicTask<T>(flags, c)); }
+    @Override
+    public <T> Task<T> submit(Map<?, ?> flags, Callable<T> c) {
+        return submit(flags, new BasicTask<T>(flags, c));
+    }
 
-    @Override public <T> Task<T> submit(TaskAdaptable<T> t) { return 
submit(new LinkedHashMap<Object,Object>(1), t); }
+    @Override
+    public <T> Task<T> submit(TaskAdaptable<T> t) {
+        return submit(new LinkedHashMap<Object, Object>(1), t);
+    }
 
     @Override
-    public <T> Task<T> submit(Map<?,?> flags, TaskAdaptable<T> task) {
+    public <T> Task<T> submit(Map<?, ?> flags, TaskAdaptable<T> task) {
         if (!(task instanceof Task))
             task = task.asTask();
         synchronized (task) {
-            if (((TaskInternal<?>)task).getInternalFuture()!=null) return 
(Task<T>)task;
+            if (((TaskInternal<?>) task).getInternalFuture() != null) return 
(Task<T>) task;
             return submitNewTask(flags, (Task<T>) task);
         }
     }
 
-    public <T> Task<T> scheduleWith(Task<T> task) { return 
scheduleWith(Collections.emptyMap(), task); }
-    public <T> Task<T> scheduleWith(Map<?,?> flags, Task<T> task) {
+    public <T> Task<T> scheduleWith(Task<T> task) {
+        return scheduleWith(Collections.emptyMap(), task);
+    }
+
+    public <T> Task<T> scheduleWith(Map<?, ?> flags, Task<T> task) {
         synchronized (task) {
-            if (((TaskInternal<?>)task).getInternalFuture()!=null) return task;
+            if (((TaskInternal<?>) task).getInternalFuture() != null) return 
task;
             return submitNewTask(flags, task);
         }
     }
 
-    protected Task<?> submitNewScheduledTask(final Map<?,?> flags, final 
ScheduledTask task) {
+    protected Task<?> submitNewScheduledTask(final Map<?, ?> flags, final 
ScheduledTask task) {
         boolean result = false;
         try {
-            BrooklynTaskLoggingMdc.logEvent("Submitting scheduled task", task, 
BrooklynTaskTags.getTargetOrContextEntity(Tasks.current()));
+            BrooklynTaskLoggingMdc.logStartEvent("Submitting scheduled task", 
task, null);
             result = submitSubsequentScheduledTask(flags, task);
         } finally {
             if (!result) {
@@ -570,11 +640,11 @@ public class BasicExecutionManager implements 
ExecutionManager {
         }
         return task;
     }
-    
-    private boolean submitSubsequentScheduledTask(final Map<?,?> flags, final 
ScheduledTask task) {
+
+    private boolean submitSubsequentScheduledTask(final Map<?, ?> flags, final 
ScheduledTask task) {
         if (!task.isDone()) {
             task.internalFuture = delayedRunner.schedule(new 
ScheduledTaskCallable(task, flags),
-                task.delay.toNanoseconds(), TimeUnit.NANOSECONDS);
+                    task.delay.toNanoseconds(), TimeUnit.NANOSECONDS);
             return true;
         } else {
             return false;
@@ -583,7 +653,7 @@ public class BasicExecutionManager implements 
ExecutionManager {
 
     protected class ScheduledTaskCallable implements Callable<Object> {
         public ScheduledTask task;
-        public Map<?,?> flags;
+        public Map<?, ?> flags;
 
         public ScheduledTaskCallable(ScheduledTask task, Map<?, ?> flags) {
             this.task = task;
@@ -591,12 +661,12 @@ public class BasicExecutionManager implements 
ExecutionManager {
         }
 
         @Override
-        @SuppressWarnings({ "rawtypes", "unchecked" })
+        @SuppressWarnings({"rawtypes", "unchecked"})
         public Object call() {
             TaskInternal<?> taskIteration = null;
             Throwable error = null;
             try {
-                if (task.startTimeUtc==-1) {
+                if (task.startTimeUtc == -1) {
                     beforeSubmitScheduledTaskAllIterations(flags, task);
                     beforeStartScheduledTaskAllIterations(flags, task);
 
@@ -610,57 +680,60 @@ public class BasicExecutionManager implements 
ExecutionManager {
 
                 final Callable<?> oldJob = taskIteration.getJob();
                 final TaskInternal<?> taskIterationF = taskIteration;
-                taskIteration.setJob(new Callable() { @Override public Object 
call() {
-                    if (task.isCancelled()) {
-                        CancellationException cancelDetected = new 
CancellationException("cancel detected");
-                        try {
-                            afterEndScheduledTaskSubmissionIteration(flags, 
task, taskIterationF, cancelDetected);
-                        } finally {
-                            // do in finally block so runs even if above 
throws cancelDetected
-                            afterEndScheduledTaskAllIterations(flags, task, 
cancelDetected);
-                        }
-                        throw cancelDetected;
-                    }
-                    Throwable lastError = null;
-                    boolean shouldResubmit = true;
-                    task.recentRun = taskIterationF;
-                    try (BrooklynTaskLoggingMdc mdc = 
BrooklynTaskLoggingMdc.create(taskIterationF).start()) {
-                        beforeStartScheduledTaskSubmissionIteration(flags, 
task, taskIterationF);
-                        synchronized (task) {
-                            task.notifyAll();
-                        }
-                        Object result;
-                        try {
-                            result = oldJob.call();
-                            task.lastThrownType = null;
-                        } catch (Exception e) {
-                            lastError = e;
-                            shouldResubmit = shouldResubmitOnException(oldJob, 
e);
-                            throw Exceptions.propagate(e);
+                taskIteration.setJob(new Callable() {
+                    @Override
+                    public Object call() {
+                        if (task.isCancelled()) {
+                            CancellationException cancelDetected = new 
CancellationException("cancel detected in scheduled job for " + task);
+                            try {
+                                
afterEndScheduledTaskSubmissionIteration(flags, task, taskIterationF, 
cancelDetected);
+                            } finally {
+                                // do in finally block so runs even if above 
throws cancelDetected
+                                afterEndScheduledTaskAllIterations(flags, 
task, cancelDetected);
+                            }
+                            throw cancelDetected;
                         }
-                        return result;
-                    } finally {
-                        if (!task.isCancelled() || task.getEndTimeUtc()<=0) {
-                            // don't re-run on cancellation
-
-                            afterEndScheduledTaskSubmissionIteration(flags, 
task, taskIterationF, lastError);
-                            // do in finally block in case we were interrupted
-                            if (shouldResubmit && resubmit()) {
-                                // resubmitted fine, no-op
-                            } else {
-                                // not resubmitted, note ending
-                                afterEndScheduledTaskAllIterations(flags, 
task, lastError);
+                        Throwable lastError = null;
+                        boolean shouldResubmit = true;
+                        task.recentRun = taskIterationF;
+                        try (BrooklynTaskLoggingMdc mdc = 
BrooklynTaskLoggingMdc.create(taskIterationF).start()) {
+                            beforeStartScheduledTaskSubmissionIteration(flags, 
task, taskIterationF);
+                            synchronized (task) {
+                                task.notifyAll();
+                            }
+                            Object result;
+                            try {
+                                result = oldJob.call();
+                                task.lastThrownType = null;
+                            } catch (Exception e) {
+                                lastError = e;
+                                shouldResubmit = 
shouldResubmitOnException(oldJob, e);
+                                throw Exceptions.propagate(e);
+                            }
+                            return result;
+                        } finally {
+                            if (!task.isCancelled() || task.getEndTimeUtc() <= 
0) {
+                                // don't re-run on cancellation
+
+                                
afterEndScheduledTaskSubmissionIteration(flags, task, taskIterationF, 
lastError);
+                                // do in finally block in case we were 
interrupted
+                                if (shouldResubmit && resubmit()) {
+                                    // resubmitted fine, no-op
+                                } else {
+                                    // not resubmitted, note ending
+                                    afterEndScheduledTaskAllIterations(flags, 
task, lastError);
+                                }
                             }
                         }
                     }
-                }});
+                });
                 task.nextRun = taskIteration;
                 ExecutionContext ec =
                         // no longer associated the execution context on each 
execution;
 //                         BasicExecutionContext.getCurrentExecutionContext();
                         // instead it is set on the task
                         task.executionContext;
-                if (ec!=null) return ec.submit(taskIteration);
+                if (ec != null) return ec.submit(taskIteration);
                 else return submit(taskIteration);
 
             } catch (Exception e) {
@@ -672,7 +745,7 @@ public class BasicExecutionManager implements 
ExecutionManager {
 
         private boolean resubmit() {
             task.runCount++;
-            if (task.period!=null && !task.isCancelled()) {
+            if (task.period != null && !task.isCancelled()) {
                 task.delay = task.period;
                 return submitSubsequentScheduledTask(flags, task);
             } else {
@@ -704,7 +777,7 @@ public class BasicExecutionManager implements 
ExecutionManager {
 
         @Override
         public String toString() {
-            return "ScheduledTaskCallable["+task+","+flags+"]";
+            return "ScheduledTaskCallable[" + task + "," + flags + "]";
         }
     }
 
@@ -727,8 +800,8 @@ public class BasicExecutionManager implements 
ExecutionManager {
                     afterEndForCancelBeforeStart(flags, task, false);
                     throw new CancellationException();
                 }
-                result = ((TaskInternal<T>)task).getJob().call();
-            } catch(Throwable e) {
+                result = ((TaskInternal<T>) task).getJob().call();
+            } catch (Throwable e) {
                 error = e;
             } finally {
                 afterEndAtomicTask(flags, task, error);
@@ -738,7 +811,7 @@ public class BasicExecutionManager implements 
ExecutionManager {
 
         @Override
         public String toString() {
-            return "BEM.call("+task+","+flags+")";
+            return "BEM.call(" + task + "," + flags + ")";
         }
     }
 
@@ -746,7 +819,7 @@ public class BasicExecutionManager implements 
ExecutionManager {
         private final Task<T> task;
         private BasicExecutionManager execMgmt;
         private final ExecutionList listeners;
-        
+
         private 
CancellingListenableForwardingFutureForTask(BasicExecutionManager execMgmt, 
Future<T> delegate, ExecutionList list, Task<T> task) {
             super(delegate);
             this.listeners = list;
@@ -767,48 +840,48 @@ public class BasicExecutionManager implements 
ExecutionManager {
         public ExecutionList getListeners() {
             return listeners;
         }
-        
+
         @Override
         public boolean cancel(TaskCancellationMode mode) {
             boolean result = false;
             if (log.isTraceEnabled()) {
-                log.trace("CLFFT cancelling "+task+" mode "+mode);
+                log.trace("CLFFT cancelling " + task + " mode " + mode);
             }
-            if (!task.isCancelled()) result |= 
((TaskInternal<T>)task).cancel(mode);
+            if (!task.isCancelled()) result |= ((TaskInternal<T>) 
task).cancel(mode);
             result |= delegate().cancel(mode.isAllowedToInterruptTask());
-            
+
             if (mode.isAllowedToInterruptDependentSubmittedTasks()) {
-                int subtasksFound=0;
-                int subtasksReallyCancelled=0;
-                
+                int subtasksFound = 0;
+                int subtasksReallyCancelled = 0;
+
                 if (task instanceof HasTaskChildren) {
                     // cancel tasks in reverse order --
                     // it should be the case that if child1 is cancelled,
                     // a parentTask should NOT call a subsequent child2,
                     // but just in case, we cancel child2 first
                     // NB: DST and others may apply their own recursive cancel 
behaviour
-                    MutableList<Task<?>> childrenReversed = 
MutableList.copyOf( ((HasTaskChildren)task).getChildren() );
+                    MutableList<Task<?>> childrenReversed = 
MutableList.copyOf(((HasTaskChildren) task).getChildren());
                     Collections.reverse(childrenReversed);
-                    
-                    for (Task<?> child: childrenReversed) {
+
+                    for (Task<?> child : childrenReversed) {
                         if (log.isTraceEnabled()) {
-                            log.trace("Cancelling "+child+" on recursive 
cancellation of "+task);
+                            log.trace("Cancelling " + child + " on recursive 
cancellation of " + task);
                         }
                         subtasksFound++;
-                        if (((TaskInternal<?>)child).cancel(mode)) {
+                        if (((TaskInternal<?>) child).cancel(mode)) {
                             result = true;
                             subtasksReallyCancelled++;
                         }
                     }
                 }
-                for (Task<?> t: execMgmt.getAllTasks()) {
+                for (Task<?> t : execMgmt.getAllTasks()) {
                     if (task.equals(t.getSubmittedByTask())) {
                         if (mode.isAllowedToInterruptAllSubmittedTasks() || 
BrooklynTaskTags.isTransient(t)) {
                             if (log.isTraceEnabled()) {
-                                log.trace("Cancelling "+t+" on recursive 
cancellation of "+task);
+                                log.trace("Cancelling " + t + " on recursive 
cancellation of " + task);
                             }
                             subtasksFound++;
-                            if (((TaskInternal<?>)t).cancel(mode)) {
+                            if (((TaskInternal<?>) t).cancel(mode)) {
                                 result = true;
                                 subtasksReallyCancelled++;
                             }
@@ -816,10 +889,10 @@ public class BasicExecutionManager implements 
ExecutionManager {
                     }
                 }
                 if (log.isTraceEnabled()) {
-                    log.trace("On cancel of "+task+", applicable subtask count 
"+subtasksFound+", of which "+subtasksReallyCancelled+" were actively 
cancelled");
+                    log.trace("On cancel of " + task + ", applicable subtask 
count " + subtasksFound + ", of which " + subtasksReallyCancelled + " were 
actively cancelled");
                 }
             }
-  
+
             execMgmt.afterEndForCancelBeforeStart(null, task, true);
             return result;
         }
@@ -841,7 +914,7 @@ public class BasicExecutionManager implements 
ExecutionManager {
                 try {
                     listener.onTaskDone(task);
                 } catch (Exception e) {
-                    log.warn("Error running execution listener "+listener+" of 
task "+task+" done", e);
+                    log.warn("Error running execution listener " + listener + 
" of task " + task + " done", e);
                 }
             }
             // run any listeners the task owner has added to its future
@@ -850,46 +923,47 @@ public class BasicExecutionManager implements 
ExecutionManager {
     }
 
     @SuppressWarnings("unchecked")
-    protected <T> Task<T> submitNewTask(final Map<?,?> flags, final Task<T> 
task) {
+    protected <T> Task<T> submitNewTask(final Map<?, ?> flags, final Task<T> 
task) {
         if (log.isTraceEnabled()) {
-            log.trace("Submitting task {} ({}), with flags {}, and tags {}, 
job {}; caller {}", 
-                new Object[] {task.getId(), task, Sanitizer.sanitize(flags), 
BrooklynTaskTags.getTagsFast(task), 
-                (task instanceof TaskInternal ? 
((TaskInternal<T>)task).getJob() : "<unavailable>"),
-                Tasks.current() });
-            if (Tasks.current()==null && BrooklynTaskTags.isTransient(task)) {
-                log.trace("Stack trace for unparented submission of transient 
"+task, new Throwable("trace only (not an error)"));
+            log.trace("Submitting task {} ({}), with flags {}, and tags {}, 
job {}; caller {}",
+                    new Object[]{task.getId(), task, 
Sanitizer.sanitize(flags), BrooklynTaskTags.getTagsFast(task),
+                            (task instanceof TaskInternal ? ((TaskInternal<T>) 
task).getJob() : "<unavailable>"),
+                            Tasks.current()});
+            if (Tasks.current() == null && BrooklynTaskTags.isTransient(task)) 
{
+                log.trace("Stack trace for unparented submission of transient 
" + task, new Throwable("trace only (not an error)"));
             }
         }
-        
+
         if (task instanceof ScheduledTask) {
             return (Task<T>) submitNewScheduledTask(flags, (ScheduledTask) 
task);
         }
-        
+
         beforeSubmitAtomicTask(flags, task);
-        
-        if (((TaskInternal<T>)task).getJob() == null) 
-            throw new NullPointerException("Task "+task+" submitted with with 
null job: job must be supplied.");
-        
+
+        if (((TaskInternal<T>) task).getJob() == null)
+            throw new NullPointerException("Task " + task + " submitted with 
with null job: job must be supplied.");
+
         Callable<T> job = new SubmissionCallable<T>(flags, task);
-        
+
         // If there's a scheduler then use that; otherwise execute it directly
         Set<TaskScheduler> schedulers = null;
-        for (Object tago: BrooklynTaskTags.getTagsFast(task)) {
+        for (Object tago : BrooklynTaskTags.getTagsFast(task)) {
             TaskScheduler scheduler = getTaskSchedulerForTag(tago);
-            if (scheduler!=null) {
-                if (schedulers==null) schedulers = new 
LinkedHashSet<TaskScheduler>(2);
+            if (scheduler != null) {
+                if (schedulers == null) schedulers = new 
LinkedHashSet<TaskScheduler>(2);
                 schedulers.add(scheduler);
             }
         }
         Future<T> future;
-        if (schedulers!=null && !schedulers.isEmpty()) {
-            if (schedulers.size()>1) log.warn("multiple schedulers detected, 
using only the first, for "+task+": "+schedulers);
+        if (schedulers != null && !schedulers.isEmpty()) {
+            if (schedulers.size() > 1)
+                log.warn("multiple schedulers detected, using only the first, 
for " + task + ": " + schedulers);
             future = schedulers.iterator().next().submit(job);
         } else {
             future = runner.submit(job);
         }
         afterSubmitRecordFuture(task, future);
-        
+
         return task;
     }
 
@@ -898,117 +972,136 @@ public class BasicExecutionManager implements 
ExecutionManager {
         // this future allows a caller to add custom listeners
         // (it does not notify the listeners; that's our job);
         // except on cancel we want to listen
-        CancellingListenableForwardingFutureForTask<T> listenableFuture = new 
CancellingListenableForwardingFutureForTask<T>(this, future, 
((TaskInternal<T>)task).getListeners(), task);
+        CancellingListenableForwardingFutureForTask<T> listenableFuture = new 
CancellingListenableForwardingFutureForTask<T>(this, future, ((TaskInternal<T>) 
task).getListeners(), task);
         // and we want to make sure *our* (manager) listeners are given 
suitable callback 
-        ((TaskInternal<T>)task).addListener(new 
SubmissionListenerToCallManagerListeners<T>(task, listenableFuture), runner);
+        ((TaskInternal<T>) task).addListener(new 
SubmissionListenerToCallManagerListeners<T>(task, listenableFuture), runner);
         // NB: can the above mean multiple callbacks to 
TaskInternal#runListeners?
-        
+
         // finally expose the future to callers
-        ((TaskInternal<T>)task).initInternalFuture(listenableFuture);
+        ((TaskInternal<T>) task).initInternalFuture(listenableFuture);
     }
-    
-    protected void beforeSubmitScheduledTaskAllIterations(Map<?,?> flags, 
Task<?> task) {
+
+    protected void beforeSubmitScheduledTaskAllIterations(Map<?, ?> flags, 
Task<?> task) {
         // for these, beforeSubmitAtomicTask is not called,
         // but beforeStartAtomic and afterSubmitAtomic _are_ called
         internalBeforeSubmit(flags, task);
     }
-    protected void beforeSubmitScheduledTaskSubmissionIteration(Map<?,?> 
flags, Task<?> task) {
+
+    protected void beforeSubmitScheduledTaskSubmissionIteration(Map<?, ?> 
flags, Task<?> task) {
         internalBeforeSubmit(flags, task);
     }
-    protected void beforeSubmitAtomicTask(Map<?,?> flags, Task<?> task) {
+
+    protected void beforeSubmitAtomicTask(Map<?, ?> flags, Task<?> task) {
         internalBeforeSubmit(flags, task);
     }
-    protected void beforeSubmitInSameThreadTask(Map<?,?> flags, Task<?> task) {
+
+    protected void beforeSubmitInSameThreadTask(Map<?, ?> flags, Task<?> task) 
{
         internalBeforeSubmit(flags, task);
     }
-    /** invoked when a task is submitted */
-    protected void internalBeforeSubmit(Map<?,?> flags, Task<?> task) {
+
+    /**
+     * invoked when a task is submitted
+     */
+    protected void internalBeforeSubmit(Map<?, ?> flags, Task<?> task) {
         incompleteTaskIds.add(task.getId());
-        
-        if (task.getSubmittedByTaskId()==null) {
+
+        if (task.getSubmittedByTaskId() == null) {
             Task<?> currentTask = Tasks.current();
-            if (currentTask!=null) ((TaskInternal<?>)task).setSubmittedByTask(
+            if (currentTask != null) ((TaskInternal<?>) 
task).setSubmittedByTask(
                     // do this instead of soft reference (2017-09) as soft 
refs impact GC 
                     Maybe.of(new TaskLookup(this, currentTask)),
                     currentTask.getId());
         }
-        ((TaskInternal<?>)task).setSubmitTimeUtc(System.currentTimeMillis());
-        
-        if (flags!=null && flags.get("tag")!=null) 
((TaskInternal<?>)task).getMutableTags().add(flags.remove("tag"));
-        if (flags!=null && flags.get("tags")!=null) 
((TaskInternal<?>)task).getMutableTags().addAll((Collection<?>)flags.remove("tags"));
+        ((TaskInternal<?>) task).setSubmitTimeUtc(System.currentTimeMillis());
 
-        for (Object tag: BrooklynTaskTags.getTagsFast(task)) {
+        if (flags != null && flags.get("tag") != null)
+            ((TaskInternal<?>) task).getMutableTags().add(flags.remove("tag"));
+        if (flags != null && flags.get("tags") != null)
+            ((TaskInternal<?>) task).getMutableTags().addAll((Collection<?>) 
flags.remove("tags"));
+
+        for (Object tag : BrooklynTaskTags.getTagsFast(task)) {
             tasksWithTagCreating(tag).add(task);
         }
-        
+
         tasksById.put(task.getId(), task);
         totalTaskCount.incrementAndGet();
     }
-    
+
     private static class TaskLookup implements Supplier<Task<?>> {
         // this class is not meant to be serialized, but if it is, make sure 
exec mgr doesn't sneak in
         transient BasicExecutionManager mgr;
-        
+
         String id;
         String displayName;
+
         public TaskLookup(BasicExecutionManager mgr, Task<?> t) {
             this.mgr = mgr;
             id = t.getId();
-            if (mgr.getTask(id)==null) {
-                log.warn("Created task lookup for task which is not 
registered: "+t);
+            if (mgr.getTask(id) == null) {
+                log.warn("Created task lookup for task which is not 
registered: " + t);
             }
             displayName = t.getDisplayName();
         }
+
         @Override
         public Task<?> get() {
-            if (mgr==null) return gone();
+            if (mgr == null) return gone();
             Task<?> result = mgr.getTask(id);
-            if (result!=null) return result;
+            if (result != null) return result;
             return gone();
         }
+
         private <T> Task<T> gone() {
-            Task<T> t = 
Tasks.<T>builder().dynamic(false).displayName(displayName+" (placeholder for 
"+id+")")
-                .description("Details of the original task have been 
forgotten.")
-                .body(Callables.returning((T)null)).build();
+            Task<T> t = 
Tasks.<T>builder().dynamic(false).displayName(displayName + " (placeholder for 
" + id + ")")
+                    .description("Details of the original task have been 
forgotten.")
+                    .body(Callables.returning((T) null)).build();
             // don't really want anyone executing the "gone" task...
             // also if we are GC'ing tasks then cancelled may help with 
cleanup 
             // of sub-tasks that have lost their submitted-by-task reference ?
             // also don't want warnings when it's finalized, this means we 
don't need ignoreIfNotRun()
-            ((BasicTask<T>)t).cancelled = true;
+            ((BasicTask<T>) t).cancelled = true;
             return t;
         }
     }
 
-    /** normally (if not interrupted) called once for each call to {@link 
#beforeSubmitScheduledTaskAllIterations(Map, Task)} */
-    protected void beforeStartScheduledTaskAllIterations(Map<?,?> flags, 
ScheduledTask taskDoingTheInitialSchedule) {
-        taskDoingTheInitialSchedule.mdc = 
BrooklynTaskLoggingMdc.create(taskDoingTheInitialSchedule).start();
+    /**
+     * normally (if not interrupted) called once for each call to {@link 
#beforeSubmitScheduledTaskAllIterations(Map, Task)}
+     */
+    protected void beforeStartScheduledTaskAllIterations(Map<?, ?> flags, 
ScheduledTask taskDoingTheInitialSchedule) {
+        BrooklynTaskLoggingMdc.logStartEvent("Starting scheduled task 
iterations", taskDoingTheInitialSchedule, null);
 
         internalBeforeStart(flags, taskDoingTheInitialSchedule, 
!SCHEDULED_TASKS_COUNT_AS_ACTIVE, true, true);
     }
-    protected void beforeStartScheduledTaskSubmissionIteration(Map<?,?> flags, 
Task<?> taskDoingTheScheduling, Task<?> taskIteration) {
+
+    protected void beforeStartScheduledTaskSubmissionIteration(Map<?, ?> 
flags, Task<?> taskDoingTheScheduling, Task<?> taskIteration) {
         // no-op, because handled as an atomic task
         // internalBeforeStart(flags, taskIteration, true, false);
     }
-    protected void beforeStartAtomicTask(Map<?,?> flags, Task<?> task) {
+
+    protected void beforeStartAtomicTask(Map<?, ?> flags, Task<?> task) {
         internalBeforeStart(flags, task, false, true, false);
     }
-    protected void beforeStartInSameThreadTask(Map<?,?> flags, Task<?> task) {
+
+    protected void beforeStartInSameThreadTask(Map<?, ?> flags, Task<?> task) {
         internalBeforeStart(flags, task, false, false, false);
     }
-    
-    /** invoked in a task's thread when a task is starting to run (may be some 
time after submitted), 
-     * but before doing any of the task's work, so that we can update 
bookkeeping and notify callbacks */
-    protected void internalBeforeStart(Map<?,?> flags, Task<?> task, boolean 
skipIncrementCounter, boolean allowJitter, boolean 
startingThisThreadMightEndElsewhere) {
+
+    /**
+     * invoked in a task's thread when a task is starting to run (may be some 
time after submitted),
+     * but before doing any of the task's work, so that we can update 
bookkeeping and notify callbacks
+     */
+    protected void internalBeforeStart(Map<?, ?> flags, Task<?> task, boolean 
skipIncrementCounter, boolean allowJitter, boolean 
startingThisThreadMightEndElsewhere) {
         int count = skipIncrementCounter ? activeTaskCount.get() : 
activeTaskCount.incrementAndGet();
-        if (count % 1000==0 && count>0) {
-            log.warn("High number of active tasks: task #"+count+" is "+task);
+        if (count % 1000 == 0 && count > 0) {
+            log.warn("High number of active tasks: task #" + count + " is " + 
task);
         }
-        
+
         //set thread _before_ start time, so we won't get a null thread when 
there is a start-time
-        if (log.isTraceEnabled()) log.trace(""+this+" beforeStart, task: 
"+task + " running on thread " + Thread.currentThread().getName());
+        if (log.isTraceEnabled())
+            log.trace("" + this + " beforeStart, task: " + task + " running on 
thread " + Thread.currentThread().getName());
         if (!task.isCancelled()) {
             Thread thread = Thread.currentThread();
-            ((TaskInternal<?>)task).setThread(thread);
+            ((TaskInternal<?>) task).setThread(thread);
             if (!startingThisThreadMightEndElsewhere) {
                 if (RENAME_THREADS) {
                     threadOriginalName.set(thread.getName());
@@ -1017,13 +1110,13 @@ public class BasicExecutionManager implements 
ExecutionManager {
                 }
                 PerThreadCurrentTaskHolder.perThreadCurrentTask.set(task);
             }
-            
((TaskInternal<?>)task).setStartTimeUtc(System.currentTimeMillis());
+            ((TaskInternal<?>) 
task).setStartTimeUtc(System.currentTimeMillis());
         }
 
         if (allowJitter) {
             jitterThreadStart(task);
         }
-        if (flags!=null && !startingThisThreadMightEndElsewhere) {
+        if (flags != null && !startingThisThreadMightEndElsewhere) {
             invokeCallback(flags.get("newTaskStartCallback"), task);
         }
     }
@@ -1039,7 +1132,7 @@ public class BasicExecutionManager implements 
ExecutionManager {
         }
     }
 
-    @SuppressWarnings({ "unchecked", "rawtypes" })
+    @SuppressWarnings({"unchecked", "rawtypes"})
     // not ideal, such loose typing on the callback -- should prefer 
Function<Task,Object>
     // but at least it's package-private
     static Object invokeCallback(Object callable, Task<?> task) {
@@ -1048,60 +1141,75 @@ public class BasicExecutionManager implements 
ExecutionManager {
                 log.warn("Use of groovy.lang.Closure is deprecated, in 
ExecutionManager.invokeCallback");
                 loggedClosureDeprecatedInInvokeCallback = true;
             }
-            return ((Closure<?>)callable).call(task);
+            return ((Closure<?>) callable).call(task);
         }
         if (callable instanceof Callable) {
             try {
-                return ((Callable<?>)callable).call();
+                return ((Callable<?>) callable).call();
             } catch (Throwable t) {
                 throw Exceptions.propagate(t);
             }
         }
-        if (callable instanceof Runnable) { ((Runnable)callable).run(); return 
null; }
-        if (callable instanceof Function) { return 
((Function)callable).apply(task); }
-        if (callable==null) return null;
-        throw new IllegalArgumentException("Cannot invoke unexpected callback 
object "+callable+" of type "+callable.getClass()+" on "+task);
+        if (callable instanceof Runnable) {
+            ((Runnable) callable).run();
+            return null;
+        }
+        if (callable instanceof Function) {
+            return ((Function) callable).apply(task);
+        }
+        if (callable == null) return null;
+        throw new IllegalArgumentException("Cannot invoke unexpected callback 
object " + callable + " of type " + callable.getClass() + " on " + task);
     }
+
     private static boolean loggedClosureDeprecatedInInvokeCallback;
-    
-    /** normally (if not interrupted) called once for each call to {@link 
#beforeStartScheduledTaskAllIterations(Map, ScheduledTask)}  */
-    protected void afterEndScheduledTaskAllIterations(Map<?,?> flags, 
ScheduledTask taskDoingTheInitialSchedule, Throwable error) {
+
+    /**
+     * normally (if not interrupted) called once for each call to {@link 
#beforeStartScheduledTaskAllIterations(Map, ScheduledTask)}
+     */
+    protected void afterEndScheduledTaskAllIterations(Map<?, ?> flags, 
ScheduledTask taskDoingTheInitialSchedule, Throwable error) {
         boolean taskWasSubmittedAndNotYetEnded = true;
         try {
             taskWasSubmittedAndNotYetEnded = internalAfterEnd(flags, 
taskDoingTheInitialSchedule, !SCHEDULED_TASKS_COUNT_AS_ACTIVE, false, error);
         } finally {
-            if (taskDoingTheInitialSchedule.mdc!=null) {
-                taskDoingTheInitialSchedule.mdc.close();
-                taskDoingTheInitialSchedule.mdc = null;
+            BrooklynTaskLoggingMdc.logEndEvent("Ending scheduled task 
iterations", taskDoingTheInitialSchedule);
+            synchronized (taskDoingTheInitialSchedule) {
+                taskDoingTheInitialSchedule.notifyAll();
             }
-            synchronized (taskDoingTheInitialSchedule) { 
taskDoingTheInitialSchedule.notifyAll(); }
             if (taskWasSubmittedAndNotYetEnded) {
                 // prevent from running twice on cancellation after start
                 ((TaskInternal<?>) taskDoingTheInitialSchedule).runListeners();
             }
         }
     }
-    /** called once for each call to {@link 
#beforeStartScheduledTaskSubmissionIteration(Map, Task, Task)},
-     * with a per-iteration task generated by the surrounding scheduled task */
-    protected void afterEndScheduledTaskSubmissionIteration(Map<?,?> flags, 
Task<?> taskDoingTheInitialSchedule, Task<?> taskIteration, Throwable error) {
+
+    /**
+     * called once for each call to {@link 
#beforeStartScheduledTaskSubmissionIteration(Map, Task, Task)},
+     * with a per-iteration task generated by the surrounding scheduled task
+     */
+    protected void afterEndScheduledTaskSubmissionIteration(Map<?, ?> flags, 
Task<?> taskDoingTheInitialSchedule, Task<?> taskIteration, Throwable error) {
         // no-op because handled as an atomic task
         // internalAfterEnd(flags, taskIteration, false, true, error);
     }
-    /** called once for each task on which {@link #beforeStartAtomicTask(Map, 
Task)} is invoked,
-     * and normally (if not interrupted prior to start) 
-     * called once for each task on which {@link #beforeSubmitAtomicTask(Map, 
Task)} */
-    protected void afterEndAtomicTask(Map<?,?> flags, Task<?> task, Throwable 
error) {
+
+    /**
+     * called once for each task on which {@link #beforeStartAtomicTask(Map, 
Task)} is invoked,
+     * and normally (if not interrupted prior to start)
+     * called once for each task on which {@link #beforeSubmitAtomicTask(Map, 
Task)}
+     */
+    protected void afterEndAtomicTask(Map<?, ?> flags, Task<?> task, Throwable 
error) {
         internalAfterEnd(flags, task, false, true, error);
     }
-    protected void afterEndInSameThreadTask(Map<?,?> flags, Task<?> task, 
Throwable error) {
+
+    protected void afterEndInSameThreadTask(Map<?, ?> flags, Task<?> task, 
Throwable error) {
         internalAfterEnd(flags, task, false, true, error);
     }
-    protected void afterEndForCancelBeforeStart(Map<?,?> flags, Task<?> task, 
boolean calledFromCanceller) {
+
+    protected void afterEndForCancelBeforeStart(Map<?, ?> flags, Task<?> task, 
boolean calledFromCanceller) {
         if (calledFromCanceller) {
             if (task.isBegun()) {
                 // do nothing from canceller thread if task has begin;
                 // we don't want to set end time or listeners prematurely.
-                return ;
+                return;
             } else {
                 // normally task won't be submitted by executor, so do some of 
the end operations.
                 // there is a chance task has begun but not set start time,
@@ -1113,15 +1221,17 @@ public class BasicExecutionManager implements 
ExecutionManager {
         }
         internalAfterEnd(flags, task, true, !calledFromCanceller, null);
     }
-    
-    /** normally (if not interrupted) called once for each call to {@link 
#internalBeforeSubmit(Map, Task)},
-     * and, for atomic tasks and scheduled-task submission iterations where 
+
+    /**
+     * normally (if not interrupted) called once for each call to {@link 
#internalBeforeSubmit(Map, Task)},
+     * and, for atomic tasks and scheduled-task submission iterations where
      * always called once if {@link #internalBeforeStart(Map, Task, boolean, 
boolean, boolean)} is invoked and if possible
-     * (but not possible for isEndingAllIterations) in the same thread as that 
method */
-    protected boolean internalAfterEnd(Map<?,?> flags, Task<?> task, boolean 
skipDecrementCounter, boolean 
startedGuaranteedToEndInSameThreadAndEndingSameThread, Throwable error) {
+     * (but not possible for isEndingAllIterations) in the same thread as that 
method
+     */
+    protected boolean internalAfterEnd(Map<?, ?> flags, Task<?> task, boolean 
skipDecrementCounter, boolean 
startedGuaranteedToEndInSameThreadAndEndingSameThread, Throwable error) {
         boolean taskWasSubmittedAndNotYetEnded = true;
         try {
-            if (log.isTraceEnabled()) log.trace(this+" afterEnd, task: "+task);
+            if (log.isTraceEnabled()) log.trace(this + " afterEnd, task: " + 
task);
             taskWasSubmittedAndNotYetEnded = 
incompleteTaskIds.remove(task.getId());
             // this method might be called more than once, eg if cancelled, so 
use the above as a guard where single invocation is needed (eg counts)
 
@@ -1129,13 +1239,13 @@ public class BasicExecutionManager implements 
ExecutionManager {
                 activeTaskCount.decrementAndGet();
             }
 
-            if (flags!=null && taskWasSubmittedAndNotYetEnded && 
startedGuaranteedToEndInSameThreadAndEndingSameThread) {
+            if (flags != null && taskWasSubmittedAndNotYetEnded && 
startedGuaranteedToEndInSameThreadAndEndingSameThread) {
                 invokeCallback(flags.get("newTaskEndCallback"), task);
             }
-            if (task.getEndTimeUtc()>0) {
+            if (task.getEndTimeUtc() > 0) {
                 if (taskWasSubmittedAndNotYetEnded) {
                     // shouldn't happen
-                    log.debug("Task "+task+" has end time 
"+task.getEndTimeUtc()+" but was marked as incomplete");
+                    log.debug("Task " + task + " has end time " + 
task.getEndTimeUtc() + " but was marked as incomplete");
                 }
             } else {
                 ((TaskInternal<?>) 
task).setEndTimeUtc(System.currentTimeMillis());
@@ -1146,7 +1256,7 @@ public class BasicExecutionManager implements 
ExecutionManager {
                 //clear thread _after_ endTime set, so we won't get a null 
thread when there is no end-time
                 if (RENAME_THREADS) {
                     Thread thread = task.getThread();
-                    if (thread==null) {
+                    if (thread == null) {
                         log.warn("BasicTask.afterEnd invoked without 
corresponding beforeStart");
                     } else {
                         thread.setName(threadOriginalName.get());
@@ -1154,7 +1264,7 @@ public class BasicExecutionManager implements 
ExecutionManager {
                     }
                 }
             }
-            ((TaskInternal<?>)task).setThread(null);
+            ((TaskInternal<?>) task).setThread(null);
 
             // if uninteresting and transient and scheduled, go ahead and 
remove from task tags also, so it won't be reported as GC'd
             if (BrooklynTaskTags.isTransient(task) && 
UNINTERESTING_TASK_NAMES.contains(task.getDisplayName()) && 
task.getSubmittedByTask() instanceof ScheduledTask) {
@@ -1163,7 +1273,7 @@ public class BasicExecutionManager implements 
ExecutionManager {
 
         } finally {
             try {
-                if (error!=null) {
+                if (error != null) {
                     /* we throw, after logging debug.
                      * the throw means the error is available for task 
submitters to monitor.
                      * however it is possible no one is monitoring it, in 
which case we will have debug logging only for errors.
@@ -1174,26 +1284,28 @@ public class BasicExecutionManager implements 
ExecutionManager {
                     if (log.isDebugEnabled()) {
                         // debug only here, because most submitters will 
handle failures
                         if (error instanceof InterruptedException || error 
instanceof RuntimeInterruptedException) {
-                            log.debug("Detected interruption on task "+task+" 
(rethrowing)" +
-                                    (Strings.isNonBlank(error.getMessage()) ? 
": "+error.getMessage() : ""));
+                            log.debug("Detected interruption on task " + task 
+ " (rethrowing)" +
+                                    (Strings.isNonBlank(error.getMessage()) ? 
": " + error.getMessage() : ""));
                         } else if (error instanceof NullPointerException ||
                                 error instanceof IndexOutOfBoundsException ||
                                 error instanceof ClassCastException) {
-                            log.debug("Exception running task "+task+" 
(rethrowing): "+error, error);
+                            log.debug("Exception running task " + task + " 
(rethrowing): " + error, error);
                         } else {
-                            log.debug("Exception running task "+task+" 
(rethrowing): "+error);
+                            log.debug("Exception running task " + task + " 
(rethrowing): " + error);
                         }
                         if (log.isTraceEnabled()) {
-                            log.trace("Trace for exception running task 
"+task+" (rethrowing): "+error, error);
+                            log.trace("Trace for exception running task " + 
task + " (rethrowing): " + error, error);
                         }
                     }
                     throw Exceptions.propagate(error);
                 }
             } finally {
-                synchronized (task) { task.notifyAll(); }
+                synchronized (task) {
+                    task.notifyAll();
+                }
                 if (taskWasSubmittedAndNotYetEnded) {
                     // prevent from running twice on cancellation after start
-                    ((TaskInternal<?>)task).runListeners();
+                    ((TaskInternal<?>) task).runListeners();
                 }
             }
         }
@@ -1203,22 +1315,22 @@ public class BasicExecutionManager implements 
ExecutionManager {
     public TaskScheduler getTaskSchedulerForTag(Object tag) {
         return schedulerByTag.get(tag);
     }
-    
+
     public void setTaskSchedulerForTag(Object tag, Class<? extends 
TaskScheduler> scheduler) {
         synchronized (schedulerByTag) {
             TaskScheduler old = getTaskSchedulerForTag(tag);
-            if (old!=null) {
+            if (old != null) {
                 if (scheduler.isAssignableFrom(old.getClass())) {
                     /* already have such an instance */
                     return;
                 }
                 //might support multiple in future...
-                throw new IllegalStateException("Not allowed to set multiple 
TaskSchedulers on ExecutionManager tag (tag "+tag+", has "+old+", setting new 
"+scheduler+")");
+                throw new IllegalStateException("Not allowed to set multiple 
TaskSchedulers on ExecutionManager tag (tag " + tag + ", has " + old + ", 
setting new " + scheduler + ")");
             }
             try {
                 TaskScheduler schedulerI = scheduler.newInstance();
                 // allow scheduler to have a nice name, for logging etc
-                if (schedulerI instanceof CanSetName) 
((CanSetName)schedulerI).setName(""+tag);
+                if (schedulerI instanceof CanSetName) ((CanSetName) 
schedulerI).setName("" + tag);
                 setTaskSchedulerForTag(tag, schedulerI);
             } catch (InstantiationException e) {
                 throw Exceptions.propagate(e);
@@ -1227,10 +1339,10 @@ public class BasicExecutionManager implements 
ExecutionManager {
             }
         }
     }
-    
+
     /**
      * Defines a {@link TaskScheduler} to run on all subsequently submitted 
jobs with the given tag.
-     *
+     * <p>
      * Maximum of one allowed currently. Resubmissions of the same scheduler 
(or scheduler class)
      * allowed. If changing, you must call {@link 
#clearTaskSchedulerForTag(Object)} between the two.
      *
@@ -1241,9 +1353,9 @@ public class BasicExecutionManager implements 
ExecutionManager {
             scheduler.injectExecutor(runner);
 
             Object old = schedulerByTag.put(tag, scheduler);
-            if (old!=null && old!=scheduler) {
+            if (old != null && old != scheduler) {
                 //might support multiple in future...
-                throw new IllegalStateException("Not allowed to set multiple 
TaskSchedulers on ExecutionManager tag (tag "+tag+")");
+                throw new IllegalStateException("Not allowed to set multiple 
TaskSchedulers on ExecutionManager tag (tag " + tag + ")");
             }
         }
     }
@@ -1257,10 +1369,10 @@ public class BasicExecutionManager implements 
ExecutionManager {
     public boolean clearTaskSchedulerForTag(Object tag) {
         synchronized (schedulerByTag) {
             Object old = schedulerByTag.remove(tag);
-            return (old!=null);
+            return (old != null);
         }
     }
-    
+
     @VisibleForTesting
     public ConcurrentMap<Object, TaskScheduler> getSchedulerByTag() {
         return schedulerByTag;
diff --git 
a/core/src/main/java/org/apache/brooklyn/util/core/task/ScheduledTask.java 
b/core/src/main/java/org/apache/brooklyn/util/core/task/ScheduledTask.java
index 4422209..b6a7029 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/ScheduledTask.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/ScheduledTask.java
@@ -18,7 +18,10 @@
  */
 package org.apache.brooklyn.util.core.task;
 
+import org.apache.brooklyn.api.internal.BrooklynLoggingCategories;
 import org.apache.brooklyn.api.mgmt.ExecutionContext;
+import org.apache.brooklyn.core.BrooklynLogging;
+import org.apache.brooklyn.core.BrooklynLogging.LoggingLevel;
 import static org.apache.brooklyn.util.JavaGroovyEquivalents.elvis;
 import static org.apache.brooklyn.util.JavaGroovyEquivalents.groovyTruth;
 
@@ -53,8 +56,6 @@ public class ScheduledTask extends BasicTask<Object> {
     
     final Callable<Task<?>> taskFactory;
 
-    protected BrooklynTaskLoggingMdc mdc;
-
     /**
      * Initial delay before running, set as flag in constructor; defaults to 0
      */
@@ -266,6 +267,7 @@ public class ScheduledTask extends BasicTask<Object> {
     
     @Override
     protected boolean 
doCancel(org.apache.brooklyn.util.core.task.TaskInternal.TaskCancellationMode 
mode) {
+        BrooklynLogging.log(BrooklynLoggingCategories.TASK_LIFECYCLE_LOG, 
LoggingLevel.DEBUG, "Cancelling scheduled task "+this);
         if (nextRun!=null) {
             ((TaskInternal<?>)nextRun).cancel(mode);
             try {
diff --git 
a/utils/common/src/main/java/org/apache/brooklyn/util/exceptions/RuntimeInterruptedException.java
 
b/utils/common/src/main/java/org/apache/brooklyn/util/exceptions/RuntimeInterruptedException.java
index c0cde50..f207001 100644
--- 
a/utils/common/src/main/java/org/apache/brooklyn/util/exceptions/RuntimeInterruptedException.java
+++ 
b/utils/common/src/main/java/org/apache/brooklyn/util/exceptions/RuntimeInterruptedException.java
@@ -33,6 +33,11 @@ public class RuntimeInterruptedException extends 
RuntimeException {
 
     private static final long serialVersionUID = 915050245927866175L;
 
+    public RuntimeInterruptedException(String msg) {
+        super(msg);
+        Thread.currentThread().interrupt();
+    }
+
     public RuntimeInterruptedException(InterruptedException cause) {
         super(cause);
         Thread.currentThread().interrupt();

Reply via email to