Repository: brooklyn-server Updated Branches: refs/heads/master a77b33878 -> f2ce1cf92
Adds optional task startup jitter Useful for troubleshooting/testing concurrency related code. Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/42db8640 Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/42db8640 Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/42db8640 Branch: refs/heads/master Commit: 42db86403e0c4f96651b93a6f720c520f92bd619 Parents: fea8a6e Author: Svetoslav Neykov <svetoslav.ney...@cloudsoftcorp.com> Authored: Sun Oct 2 12:20:58 2016 +0300 Committer: Svetoslav Neykov <svetoslav.ney...@cloudsoftcorp.com> Committed: Tue Oct 11 09:20:11 2016 +0300 ---------------------------------------------------------------------- .../core/BrooklynFeatureEnablement.java | 9 ++++ .../util/core/task/BasicExecutionManager.java | 43 ++++++++++++++++++-- 2 files changed, 49 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/42db8640/core/src/main/java/org/apache/brooklyn/core/BrooklynFeatureEnablement.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/BrooklynFeatureEnablement.java b/core/src/main/java/org/apache/brooklyn/core/BrooklynFeatureEnablement.java index f2e8a0a..8bc2ffd 100644 --- a/core/src/main/java/org/apache/brooklyn/core/BrooklynFeatureEnablement.java +++ b/core/src/main/java/org/apache/brooklyn/core/BrooklynFeatureEnablement.java @@ -84,6 +84,14 @@ public class BrooklynFeatureEnablement { public static final String FEATURE_RENAME_THREADS = "brooklyn.executionManager.renameThreads"; /** + * Add a jitter to the startup of tasks for testing concurrency code. + * Use {@code brooklyn.executionManager.jitterThreads.maxDelay} to tune the maximum time task + * startup gets delayed in milliseconds. The actual time will be a random value between [0, maxDelay). + * Default is 200 milliseconds. + */ + public static final String FEATURE_JITTER_THREADS = "brooklyn.executionManager.jitterThreads"; + + /** * When rebinding to state created from very old versions, the catalogItemId properties will be missing which * results in errors when OSGi bundles are used. When enabled the code tries to infer the catalogItemId from * <ul> @@ -149,6 +157,7 @@ public class BrooklynFeatureEnablement { setDefault(FEATURE_DEFAULT_STANDBY_IS_HOT_PROPERTY, false); setDefault(FEATURE_USE_BROOKLYN_LIVE_OBJECTS_DATAGRID_STORAGE, false); setDefault(FEATURE_RENAME_THREADS, false); + setDefault(FEATURE_JITTER_THREADS, false); setDefault(FEATURE_BACKWARDS_COMPATIBILITY_INFER_CATALOG_ITEM_ON_REBIND, true); setDefault(FEATURE_AUTO_FIX_CATALOG_REF_ON_REBIND, false); setDefault(FEATURE_SSH_ASYNC_EXEC, false); http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/42db8640/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java index 8f43827..70672f9 100644 --- a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java +++ b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java @@ -19,7 +19,6 @@ package org.apache.brooklyn.util.core.task; import static com.google.common.base.Preconditions.checkNotNull; -import groovy.lang.Closure; import java.util.Collection; import java.util.Collections; @@ -41,6 +40,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -70,12 +70,13 @@ import com.google.common.base.CaseFormat; import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; -import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.common.util.concurrent.ExecutionList; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import groovy.lang.Closure; + /** * Manages the execution of atomic tasks and scheduled (recurring) tasks, * including setting tags and invoking callbacks. @@ -84,7 +85,11 @@ public class BasicExecutionManager implements ExecutionManager { private static final Logger log = LoggerFactory.getLogger(BasicExecutionManager.class); private static final boolean RENAME_THREADS = BrooklynFeatureEnablement.isEnabled(BrooklynFeatureEnablement.FEATURE_RENAME_THREADS); - + private static final String JITTER_THREADS_MAX_DELAY_PROPERTY = BrooklynFeatureEnablement.FEATURE_JITTER_THREADS + ".maxDelay"; + + private boolean jitterThreads = BrooklynFeatureEnablement.isEnabled(BrooklynFeatureEnablement.FEATURE_JITTER_THREADS); + private int jitterThreadsMaxDelay = Integer.getInteger(JITTER_THREADS_MAX_DELAY_PROPERTY, 200); + private static class PerThreadCurrentTaskHolder { public static final ThreadLocal<Task<?>> perThreadCurrentTask = new ThreadLocal<Task<?>>(); } @@ -146,6 +151,10 @@ public class BasicExecutionManager implements ExecutionManager { daemonThreadFactory); delayedRunner = new ScheduledThreadPoolExecutor(1, daemonThreadFactory); + + if (jitterThreads) { + log.info("Task startup jittering enabled with a maximum of " + jitterThreadsMaxDelay + " delay."); + } } private final static class UncaughtExceptionHandlerImplementation implements Thread.UncaughtExceptionHandler { @@ -757,9 +766,23 @@ public class BasicExecutionManager implements ExecutionManager { PerThreadCurrentTaskHolder.perThreadCurrentTask.set(task); ((TaskInternal<?>)task).setStartTimeUtc(System.currentTimeMillis()); } + + jitterThreadStart(task); + invokeCallback(flags.get("newTaskStartCallback"), task); } + private void jitterThreadStart(Task<?> task) { + if (jitterThreads) { + try { + Thread.sleep(ThreadLocalRandom.current().nextInt(jitterThreadsMaxDelay)); + } catch (InterruptedException e) { + log.warn("Task " + task + " got cancelled before starting because of jitter."); + throw Exceptions.propagate(e); + } + } + } + @SuppressWarnings({ "unchecked", "rawtypes" }) // not ideal, such loose typing on the callback -- should prefer Function<Task,Object> // but at least it's package-private @@ -890,4 +913,18 @@ public class BasicExecutionManager implements ExecutionManager { return schedulerByTag; } + public void setJitterThreads(boolean jitterThreads) { + this.jitterThreads = jitterThreads; + if (jitterThreads) { + log.info("Task startup jittering enabled with a maximum of " + jitterThreadsMaxDelay + " delay."); + } else { + log.info("Disabled task startup jittering"); + } + } + + public void setJitterThreadsMaxDelay(int jitterThreadsMaxDelay) { + this.jitterThreadsMaxDelay = jitterThreadsMaxDelay; + log.info("Setting task startup jittering maximum delay to " + jitterThreadsMaxDelay); + } + }