Repository: brooklyn-server
Updated Branches:
  refs/heads/master a77b33878 -> f2ce1cf92


Adds optional task startup jitter

Useful for troubleshooting/testing concurrency related code.


Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/42db8640
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/42db8640
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/42db8640

Branch: refs/heads/master
Commit: 42db86403e0c4f96651b93a6f720c520f92bd619
Parents: fea8a6e
Author: Svetoslav Neykov <svetoslav.ney...@cloudsoftcorp.com>
Authored: Sun Oct 2 12:20:58 2016 +0300
Committer: Svetoslav Neykov <svetoslav.ney...@cloudsoftcorp.com>
Committed: Tue Oct 11 09:20:11 2016 +0300

----------------------------------------------------------------------
 .../core/BrooklynFeatureEnablement.java         |  9 ++++
 .../util/core/task/BasicExecutionManager.java   | 43 ++++++++++++++++++--
 2 files changed, 49 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/42db8640/core/src/main/java/org/apache/brooklyn/core/BrooklynFeatureEnablement.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/brooklyn/core/BrooklynFeatureEnablement.java 
b/core/src/main/java/org/apache/brooklyn/core/BrooklynFeatureEnablement.java
index f2e8a0a..8bc2ffd 100644
--- a/core/src/main/java/org/apache/brooklyn/core/BrooklynFeatureEnablement.java
+++ b/core/src/main/java/org/apache/brooklyn/core/BrooklynFeatureEnablement.java
@@ -84,6 +84,14 @@ public class BrooklynFeatureEnablement {
     public static final String FEATURE_RENAME_THREADS = 
"brooklyn.executionManager.renameThreads";
 
     /**
+     * Add a jitter to the startup of tasks for testing concurrency code.
+     * Use {@code brooklyn.executionManager.jitterThreads.maxDelay} to tune 
the maximum time task
+     * startup gets delayed in milliseconds. The actual time will be a random 
value between [0, maxDelay).
+     * Default is 200 milliseconds.
+     */
+    public static final String FEATURE_JITTER_THREADS = 
"brooklyn.executionManager.jitterThreads";
+
+    /**
      * When rebinding to state created from very old versions, the 
catalogItemId properties will be missing which
      * results in errors when OSGi bundles are used. When enabled the code 
tries to infer the catalogItemId from
      * <ul>
@@ -149,6 +157,7 @@ public class BrooklynFeatureEnablement {
         setDefault(FEATURE_DEFAULT_STANDBY_IS_HOT_PROPERTY, false);
         setDefault(FEATURE_USE_BROOKLYN_LIVE_OBJECTS_DATAGRID_STORAGE, false);
         setDefault(FEATURE_RENAME_THREADS, false);
+        setDefault(FEATURE_JITTER_THREADS, false);
         
setDefault(FEATURE_BACKWARDS_COMPATIBILITY_INFER_CATALOG_ITEM_ON_REBIND, true);
         setDefault(FEATURE_AUTO_FIX_CATALOG_REF_ON_REBIND, false);
         setDefault(FEATURE_SSH_ASYNC_EXEC, false);

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/42db8640/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
 
b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
index 8f43827..70672f9 100644
--- 
a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
+++ 
b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
@@ -19,7 +19,6 @@
 package org.apache.brooklyn.util.core.task;
 
 import static com.google.common.base.Preconditions.checkNotNull;
-import groovy.lang.Closure;
 
 import java.util.Collection;
 import java.util.Collections;
@@ -41,6 +40,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -70,12 +70,13 @@ import com.google.common.base.CaseFormat;
 import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.ExecutionList;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
+import groovy.lang.Closure;
+
 /**
  * Manages the execution of atomic tasks and scheduled (recurring) tasks,
  * including setting tags and invoking callbacks.
@@ -84,7 +85,11 @@ public class BasicExecutionManager implements 
ExecutionManager {
     private static final Logger log = 
LoggerFactory.getLogger(BasicExecutionManager.class);
 
     private static final boolean RENAME_THREADS = 
BrooklynFeatureEnablement.isEnabled(BrooklynFeatureEnablement.FEATURE_RENAME_THREADS);
-    
+    private static final String JITTER_THREADS_MAX_DELAY_PROPERTY = 
BrooklynFeatureEnablement.FEATURE_JITTER_THREADS + ".maxDelay";
+
+    private boolean jitterThreads = 
BrooklynFeatureEnablement.isEnabled(BrooklynFeatureEnablement.FEATURE_JITTER_THREADS);
+    private int jitterThreadsMaxDelay = 
Integer.getInteger(JITTER_THREADS_MAX_DELAY_PROPERTY, 200);
+
     private static class PerThreadCurrentTaskHolder {
         public static final ThreadLocal<Task<?>> perThreadCurrentTask = new 
ThreadLocal<Task<?>>();
     }
@@ -146,6 +151,10 @@ public class BasicExecutionManager implements 
ExecutionManager {
                 daemonThreadFactory);
             
         delayedRunner = new ScheduledThreadPoolExecutor(1, 
daemonThreadFactory);
+
+        if (jitterThreads) {
+            log.info("Task startup jittering enabled with a maximum of " + 
jitterThreadsMaxDelay + " delay.");
+        }
     }
     
     private final static class UncaughtExceptionHandlerImplementation 
implements Thread.UncaughtExceptionHandler {
@@ -757,9 +766,23 @@ public class BasicExecutionManager implements 
ExecutionManager {
             PerThreadCurrentTaskHolder.perThreadCurrentTask.set(task);
             
((TaskInternal<?>)task).setStartTimeUtc(System.currentTimeMillis());
         }
+
+        jitterThreadStart(task);
+
         invokeCallback(flags.get("newTaskStartCallback"), task);
     }
 
+    private void jitterThreadStart(Task<?> task) {
+        if (jitterThreads) {
+            try {
+                
Thread.sleep(ThreadLocalRandom.current().nextInt(jitterThreadsMaxDelay));
+            } catch (InterruptedException e) {
+                log.warn("Task " + task + " got cancelled before starting 
because of jitter.");
+                throw Exceptions.propagate(e);
+            }
+        }
+    }
+
     @SuppressWarnings({ "unchecked", "rawtypes" })
     // not ideal, such loose typing on the callback -- should prefer 
Function<Task,Object>
     // but at least it's package-private
@@ -890,4 +913,18 @@ public class BasicExecutionManager implements 
ExecutionManager {
         return schedulerByTag;
     }
 
+    public void setJitterThreads(boolean jitterThreads) {
+        this.jitterThreads = jitterThreads;
+        if (jitterThreads) {
+            log.info("Task startup jittering enabled with a maximum of " + 
jitterThreadsMaxDelay + " delay.");
+        } else {
+            log.info("Disabled task startup jittering");
+        }
+    }
+
+    public void setJitterThreadsMaxDelay(int jitterThreadsMaxDelay) {
+        this.jitterThreadsMaxDelay = jitterThreadsMaxDelay;
+        log.info("Setting task startup jittering maximum delay to " + 
jitterThreadsMaxDelay);
+    }
+
 }

Reply via email to