Repository: samza
Updated Branches:
  refs/heads/master 2aa9f893d -> 4aae9ad8c


SAMZA-1017 - Added disk quota based throttling to AsyncRunLoop


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/4aae9ad8
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/4aae9ad8
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/4aae9ad8

Branch: refs/heads/master
Commit: 4aae9ad8ccd1c5ff9fdecb812d16c5aaf91c4b92
Parents: 2aa9f89
Author: Prateek Maheshwari <[email protected]>
Authored: Wed Oct 19 12:04:52 2016 -0700
Committer: Xinyu Liu <[email protected]>
Committed: Wed Oct 19 12:05:24 2016 -0700

----------------------------------------------------------------------
 .../apache/samza/container/RunLoopFactory.java  |  10 +-
 .../disk/WatermarkDiskQuotaPolicy.java          |   8 +-
 .../org/apache/samza/task/AsyncRunLoop.java     |  70 ++++--
 .../org/apache/samza/util/Throttleable.java     |  48 ++++
 .../apache/samza/util/ThrottlingExecutor.java   |  25 +-
 .../apache/samza/util/ThrottlingScheduler.java  | 148 +++++++++++
 .../org/apache/samza/container/RunLoop.scala    |  16 +-
 .../apache/samza/container/SamzaContainer.scala |  32 +--
 .../org/apache/samza/task/TestAsyncRunLoop.java |  21 +-
 .../samza/util/TestThrottlingScheduler.java     | 246 +++++++++++++++++++
 .../apache/samza/container/TestRunLoop.scala    |  23 +-
 .../samza/container/TestSamzaContainer.scala    |  29 +--
 12 files changed, 559 insertions(+), 117 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/4aae9ad8/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java 
b/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
index a789d04..609a956 100644
--- a/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
@@ -19,7 +19,6 @@
 
 package org.apache.samza.container;
 
-import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.TaskConfig;
@@ -32,8 +31,8 @@ import org.slf4j.LoggerFactory;
 import scala.collection.JavaConversions;
 import scala.runtime.AbstractFunction1;
 
-import static org.apache.samza.util.Utils.defaultValue;
 import static org.apache.samza.util.Utils.defaultClock;
+import static org.apache.samza.util.Utils.defaultValue;
 
 /**
  * Factory class to create runloop for a Samza task, based on the type
@@ -49,7 +48,7 @@ public class RunLoopFactory {
   public static Runnable 
createRunLoop(scala.collection.immutable.Map<TaskName, TaskInstance<?>> 
taskInstances,
       SystemConsumers consumerMultiplexer,
       ExecutorService threadPool,
-      Executor executor,
+      long maxThrottlingDelayMs,
       SamzaContainerMetrics containerMetrics,
       TaskConfig config) {
 
@@ -81,10 +80,10 @@ public class RunLoopFactory {
         streamTaskInstances,
         consumerMultiplexer,
         containerMetrics,
+        maxThrottlingDelayMs,
         taskWindowMs,
         taskCommitMs,
-        defaultClock(),
-        executor);
+        defaultClock());
     } else {
       Integer taskMaxConcurrency = 
config.getMaxConcurrency().getOrElse(defaultValue(1));
 
@@ -106,6 +105,7 @@ public class RunLoopFactory {
         taskWindowMs,
         taskCommitMs,
         callbackTimeout,
+        maxThrottlingDelayMs,
         containerMetrics);
     }
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/4aae9ad8/samza-core/src/main/java/org/apache/samza/container/disk/WatermarkDiskQuotaPolicy.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/container/disk/WatermarkDiskQuotaPolicy.java
 
b/samza-core/src/main/java/org/apache/samza/container/disk/WatermarkDiskQuotaPolicy.java
index 21fbca2..7221318 100644
--- 
a/samza-core/src/main/java/org/apache/samza/container/disk/WatermarkDiskQuotaPolicy.java
+++ 
b/samza-core/src/main/java/org/apache/samza/container/disk/WatermarkDiskQuotaPolicy.java
@@ -19,7 +19,7 @@
 
 package org.apache.samza.container.disk;
 
-import org.apache.samza.util.ThrottlingExecutor;
+import org.apache.samza.util.Throttleable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -105,7 +105,7 @@ public class WatermarkDiskQuotaPolicy implements 
DiskQuotaPolicy {
 
     // Validate entries
     double lastHighWaterMark = 1.0;
-    double lastWorkFactor = ThrottlingExecutor.MAX_WORK_FACTOR;
+    double lastWorkFactor = Throttleable.MAX_WORK_FACTOR;
     for (int i = 0; i < entries.size(); ++i) {
       final Entry entry = entries.get(i);
 
@@ -123,10 +123,10 @@ public class WatermarkDiskQuotaPolicy implements 
DiskQuotaPolicy {
             dumpPolicyEntries(entries));
       }
 
-      if (entry.getWorkFactor() < ThrottlingExecutor.MIN_WORK_FACTOR) {
+      if (entry.getWorkFactor() < Throttleable.MIN_WORK_FACTOR) {
         throw new IllegalArgumentException("Policy entry " + i +
             " has work factor (" + entry.getWorkFactor() +
-            ") < minimum work factor (" + ThrottlingExecutor.MIN_WORK_FACTOR + 
"):" +
+            ") < minimum work factor (" + Throttleable.MIN_WORK_FACTOR + "):" +
             dumpPolicyEntries(entries));
       }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/4aae9ad8/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java 
b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
index 77eceea..8fac815 100644
--- a/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
+++ b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
@@ -42,6 +42,8 @@ import org.apache.samza.container.TaskName;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.SystemConsumers;
 import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Throttleable;
+import org.apache.samza.util.ThrottlingScheduler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.collection.JavaConversions;
@@ -50,7 +52,7 @@ import scala.collection.JavaConversions;
 /**
  * The AsyncRunLoop supports multithreading execution of Samza {@link 
AsyncStreamTask}s.
  */
-public class AsyncRunLoop implements Runnable {
+public class AsyncRunLoop implements Runnable, Throttleable {
   private static final Logger log = 
LoggerFactory.getLogger(AsyncRunLoop.class);
 
   private final Map<TaskName, AsyncTaskWorker> taskWorkers;
@@ -67,6 +69,7 @@ public class AsyncRunLoop implements Runnable {
   private final SamzaContainerMetrics containerMetrics;
   private final ScheduledExecutorService workerTimer;
   private final ScheduledExecutorService callbackTimer;
+  private final ThrottlingScheduler callbackExecutor;
   private volatile boolean shutdownNow = false;
   private volatile Throwable throwable = null;
 
@@ -77,6 +80,7 @@ public class AsyncRunLoop implements Runnable {
       long windowMs,
       long commitMs,
       long callbackTimeoutMs,
+      long maxThrottlingDelayMs,
       SamzaContainerMetrics containerMetrics) {
 
     this.threadPool = threadPool;
@@ -87,6 +91,7 @@ public class AsyncRunLoop implements Runnable {
     this.maxConcurrency = maxConcurrency;
     this.callbackTimeoutMs = callbackTimeoutMs;
     this.callbackTimer = (callbackTimeoutMs > 0) ? 
Executors.newSingleThreadScheduledExecutor() : null;
+    this.callbackExecutor = new ThrottlingScheduler(maxThrottlingDelayMs);
     this.coordinatorRequests = new CoordinatorRequests(taskInstances.keySet());
     this.latch = new Object();
     this.workerTimer = Executors.newSingleThreadScheduledExecutor();
@@ -159,10 +164,21 @@ public class AsyncRunLoop implements Runnable {
       }
     } finally {
       workerTimer.shutdown();
+      callbackExecutor.shutdown();
       if (callbackTimer != null) callbackTimer.shutdown();
     }
   }
 
+  @Override
+  public void setWorkFactor(double workFactor) {
+    callbackExecutor.setWorkFactor(workFactor);
+  }
+
+  @Override
+  public double getWorkFactor() {
+    return callbackExecutor.getWorkFactor();
+  }
+
   public void shutdown() {
     shutdownNow = true;
   }
@@ -480,30 +496,36 @@ public class AsyncRunLoop implements Runnable {
      * * @param callback AsyncSteamTask.processAsync callback
      */
     @Override
-    public void onComplete(TaskCallback callback) {
-      try {
-        state.doneProcess();
-        TaskCallbackImpl callbackImpl = (TaskCallbackImpl) callback;
-        containerMetrics.processNs().update(System.nanoTime() - 
callbackImpl.timeCreatedNs);
-        log.trace("Got callback complete for task {}, ssp {}", 
callbackImpl.taskName, callbackImpl.envelope.getSystemStreamPartition());
-
-        TaskCallbackImpl callbackToUpdate = 
callbackManager.updateCallback(callbackImpl, true);
-        if (callbackToUpdate != null) {
-          IncomingMessageEnvelope envelope = callbackToUpdate.envelope;
-          log.trace("Update offset for ssp {}, offset {}", 
envelope.getSystemStreamPartition(), envelope.getOffset());
-
-          // update offset
-          task.offsetManager().update(task.taskName(), 
envelope.getSystemStreamPartition(), envelope.getOffset());
-
-          // update coordinator
-          coordinatorRequests.update(callbackToUpdate.coordinator);
+    public void onComplete(final TaskCallback callback) {
+      long workNanos = System.nanoTime() - ((TaskCallbackImpl) 
callback).timeCreatedNs;
+      callbackExecutor.schedule(new Runnable() {
+        @Override
+        public void run() {
+          try {
+            state.doneProcess();
+            TaskCallbackImpl callbackImpl = (TaskCallbackImpl) callback;
+            containerMetrics.processNs().update(System.nanoTime() - 
callbackImpl.timeCreatedNs);
+            log.trace("Got callback complete for task {}, ssp {}", 
callbackImpl.taskName, callbackImpl.envelope.getSystemStreamPartition());
+
+            TaskCallbackImpl callbackToUpdate = 
callbackManager.updateCallback(callbackImpl, true);
+            if (callbackToUpdate != null) {
+              IncomingMessageEnvelope envelope = callbackToUpdate.envelope;
+              log.trace("Update offset for ssp {}, offset {}", 
envelope.getSystemStreamPartition(), envelope.getOffset());
+
+              // update offset
+              task.offsetManager().update(task.taskName(), 
envelope.getSystemStreamPartition(), envelope.getOffset());
+
+              // update coordinator
+              coordinatorRequests.update(callbackToUpdate.coordinator);
+            }
+          } catch (Throwable t) {
+            log.error(t.getMessage(), t);
+            abort(t);
+          } finally {
+            resume();
+          }
         }
-      } catch (Throwable t) {
-        log.error(t.getMessage(), t);
-        abort(t);
-      } finally {
-        resume();
-      }
+      }, workNanos);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/samza/blob/4aae9ad8/samza-core/src/main/java/org/apache/samza/util/Throttleable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/util/Throttleable.java 
b/samza-core/src/main/java/org/apache/samza/util/Throttleable.java
new file mode 100644
index 0000000..8d1d8ea
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/util/Throttleable.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.util;
+
+/**
+ * An object that performs work and optionally slows the rate of execution. By 
default,
+ * work will not be throttled. Work can be throttled by setting work factor to 
less than
+ * {@link #MAX_WORK_FACTOR}.
+ */
+public interface Throttleable {
+  double MAX_WORK_FACTOR = 1.0;
+  double MIN_WORK_FACTOR = 0.001;
+
+  /**
+   * Sets the work factor for this object. A work factor of {@code 1.0} 
indicates that execution
+   * should proceed at full throughput. A work factor of less than {@code 1.0} 
will introduce
+   * delays into the execution to approximate the requested work factor. For 
example, if the
+   * work factor is {@code 0.7} then approximately 70% of the execution time 
will be spent
+   * executing the work while 30% will be spent idle.
+   *
+   * @param workFactor the work factor to set for this throttler.
+   */
+  void setWorkFactor(double workFactor);
+
+  /**
+   * Returns the current work factor in use.
+   * @see #setWorkFactor(double)
+   * @return the current work factor.
+   */
+  double getWorkFactor();
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/4aae9ad8/samza-core/src/main/java/org/apache/samza/util/ThrottlingExecutor.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/util/ThrottlingExecutor.java 
b/samza-core/src/main/java/org/apache/samza/util/ThrottlingExecutor.java
index afcc4c5..d1298fc 100644
--- a/samza-core/src/main/java/org/apache/samza/util/ThrottlingExecutor.java
+++ b/samza-core/src/main/java/org/apache/samza/util/ThrottlingExecutor.java
@@ -30,10 +30,7 @@ import java.util.concurrent.TimeUnit;
  * This class is *NOT* thread-safe. It is intended to be used from a single 
thread. However, the
  * work factor may be set from any thread.
  */
-public class ThrottlingExecutor implements Executor {
-  public static final double MAX_WORK_FACTOR = 1.0;
-  public static final double MIN_WORK_FACTOR = 0.001;
-
+public class ThrottlingExecutor implements Throttleable, Executor {
   private final long maxDelayNanos;
   private final HighResolutionClock clock;
 
@@ -54,7 +51,7 @@ public class ThrottlingExecutor implements Executor {
    * is less than 1.0) this command may optionally insert a delay before 
returning to satisfy the
    * requested work factor.
    * <p>
-   * This method will not operate correct if used by more than one thread.
+   * This method will not operate correctly if used by more than one thread.
    *
    * @param command the work to execute
    */
@@ -85,15 +82,7 @@ public class ThrottlingExecutor implements Executor {
     }
   }
 
-  /**
-   * Sets the work factor for this executor. A work factor of {@code 1.0} 
indicates that execution
-   * should proceed at full throughput. A work factor of less than {@code 1.0} 
will introduce
-   * delays into the {@link #execute(Runnable)} call to approximate the 
requested work factor. For
-   * example, if the work factor is {@code 0.7} then approximately 70% of the 
execute call will be
-   * spent executing the supplied command while 30% will be spent idle.
-   *
-   * @param workFactor the work factor to set for this executor.
-   */
+  @Override
   public void setWorkFactor(double workFactor) {
     if (workFactor < MIN_WORK_FACTOR) {
       throw new IllegalArgumentException("Work factor must be >= " + 
MIN_WORK_FACTOR);
@@ -105,11 +94,7 @@ public class ThrottlingExecutor implements Executor {
     workToIdleFactor = (1.0 - workFactor) / workFactor;
   }
 
-  /**
-   * Returns the current work factor in use.
-   * @see #setWorkFactor(double)
-   * @return the current work factor.
-   */
+  @Override
   public double getWorkFactor() {
     return 1.0 / (workToIdleFactor + 1.0);
   }
@@ -137,4 +122,4 @@ public class ThrottlingExecutor implements Executor {
   void setPendingNanos(long pendingNanos) {
     this.pendingNanos = pendingNanos;
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/4aae9ad8/samza-core/src/main/java/org/apache/samza/util/ThrottlingScheduler.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/util/ThrottlingScheduler.java 
b/samza-core/src/main/java/org/apache/samza/util/ThrottlingScheduler.java
new file mode 100644
index 0000000..5b5780b
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/util/ThrottlingScheduler.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.util;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * An object that schedules work to be performed and optionally slows the rate 
of execution.
+ * By default work submitted with {@link #schedule(Runnable, long)}} will not 
be throttled. Work can
+ * be throttled by invoking {@link #setWorkFactor(double)}.
+ * <p>
+ * This class is thread-safe. It must be {@link #shutdown} after use.
+ */
+public class ThrottlingScheduler implements Throttleable {
+  private final long maxDelayNanos;
+  private final ScheduledExecutorService scheduledExecutorService;
+  private final HighResolutionClock clock;
+
+  private final AtomicLong pendingNanos = new AtomicLong();
+  private volatile double workToIdleFactor;
+
+  public ThrottlingScheduler(long maxDelayMillis) {
+    this.maxDelayNanos = TimeUnit.MILLISECONDS.toNanos(maxDelayMillis);
+    this.scheduledExecutorService = 
Executors.newSingleThreadScheduledExecutor();
+    this.clock = new SystemHighResolutionClock();
+  }
+
+  ThrottlingScheduler(long maxDelayMillis, ScheduledExecutorService 
scheduledExecutorService,
+      HighResolutionClock clock) {
+    this.maxDelayNanos = TimeUnit.MILLISECONDS.toNanos(maxDelayMillis);
+    this.scheduledExecutorService = scheduledExecutorService;
+    this.clock = clock;
+  }
+
+  /**
+   * This method may be used to throttle asynchronous processing by delaying 
the work completion callback.
+   * <p>
+   * Executes the given completion callback on the current thread. If 
throttling is enabled (the work factor
+   * is less than 1.0) this method may optionally schedule the callback with a 
delay to satisfy the
+   * requested work factor.
+   *
+   * @param callback the callback to complete asynchronous work
+   * @param workDurationNs the duration of asynchronous work in nanoseconds
+   */
+  public void schedule(final Runnable callback, final long workDurationNs) {
+    final double currentWorkToIdleFactor = workToIdleFactor;
+
+    // If we're not throttling, do not get clock time, etc. This substantially 
reduces the overhead
+    // per invocation of this feature.
+    if (currentWorkToIdleFactor == 0.0) {
+      callback.run();
+    } else {
+      final long delay = Math.min(maxDelayNanos, (long) (workDurationNs * 
currentWorkToIdleFactor));
+
+      // NOTE: we accumulate pending delay nanos here, but reduce the pending 
delay nanos after
+      // the delay operation (if applicable), so they do not continue to grow.
+      addToPendingNanos(delay);
+
+      if (pendingNanos.get() < 0) {
+        callback.run();
+      } else {
+        final long startTimeNs = clock.nanoTime();
+        scheduledExecutorService.schedule(new Runnable() {
+          @Override
+          public void run() {
+            final long actualDelay = clock.nanoTime() - startTimeNs;
+            addToPendingNanos(-actualDelay);
+            callback.run();
+          }
+        }, delay, TimeUnit.NANOSECONDS);
+      }
+    }
+  }
+
+  private void addToPendingNanos(final long amount) {
+    long currentValue;
+    long newValue;
+    do {
+      currentValue = pendingNanos.get();
+      newValue = Util.clampAdd(currentValue, amount);
+    } while (!pendingNanos.compareAndSet(currentValue, newValue));
+  }
+
+  @Override
+  public void setWorkFactor(double workFactor) {
+    if (workFactor < MIN_WORK_FACTOR) {
+      throw new IllegalArgumentException("Work factor must be >= " + 
MIN_WORK_FACTOR);
+    }
+    if (workFactor > MAX_WORK_FACTOR) {
+      throw new IllegalArgumentException("Work factor must be <= " + 
MAX_WORK_FACTOR);
+    }
+
+    workToIdleFactor = (1.0 - workFactor) / workFactor;
+  }
+
+  @Override
+  public double getWorkFactor() {
+    return 1.0 / (workToIdleFactor + 1.0);
+  }
+
+  public void shutdown() {
+    scheduledExecutorService.shutdown();
+  }
+
+  /**
+   * Returns the total amount of delay (in nanoseconds) that needs to be 
applied to subsequent work.
+   * Alternatively this can be thought to capture the error between expected 
delay and actual
+   * applied delay. This accounts for variance in the precision of the delay 
mechanism,
+   * which may vary from platform to platform.
+   * <p>
+   * This is required for test purposes only.
+   *
+   * @return the total amount of delay (in nanoseconds) that needs to be 
applied to subsequent work.
+   */
+  long getPendingNanos() {
+    return pendingNanos.get();
+  }
+
+  /**
+   * A convenience method for test that allows the pending delay for this 
executor to be set
+   * explicitly.
+   *
+   * @param pendingNanos the pending nanos to set.
+   */
+  void setPendingNanos(long pendingNanos) {
+    this.pendingNanos.set(pendingNanos);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/4aae9ad8/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 
b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
index 538ebb8..7df7d88 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
@@ -19,14 +19,11 @@
 
 package org.apache.samza.container
 
-import java.util.concurrent.Executor
-
 import org.apache.samza.task.CoordinatorRequests
 import org.apache.samza.system.{IncomingMessageEnvelope, SystemConsumers, 
SystemStreamPartition}
 import org.apache.samza.task.ReadableCoordinator
 import org.apache.samza.task.StreamTask
-import org.apache.samza.util.Logging
-import org.apache.samza.util.TimerUtils
+import org.apache.samza.util.{Logging, Throttleable, ThrottlingExecutor, 
TimerUtils}
 
 import scala.collection.JavaConversions._
 
@@ -43,12 +40,13 @@ class RunLoop (
   val taskInstances: Map[TaskName, TaskInstance[StreamTask]],
   val consumerMultiplexer: SystemConsumers,
   val metrics: SamzaContainerMetrics,
+  val maxThrottlingDelayMs: Long,
   val windowMs: Long = -1,
   val commitMs: Long = 60000,
-  val clock: () => Long = { System.nanoTime },
-  val executor: Executor = new SameThreadExecutor()) extends Runnable with 
TimerUtils with Logging {
+  val clock: () => Long = { System.nanoTime }) extends Runnable with 
Throttleable with TimerUtils with Logging {
 
   private val metricsMsOffset = 1000000L
+  private val executor = new ThrottlingExecutor(maxThrottlingDelayMs)
   private var lastWindowNs = clock()
   private var lastCommitNs = clock()
   private var activeNs = 0L
@@ -96,7 +94,11 @@ class RunLoop (
     }
   }
 
-  def shutdown = {
+  def setWorkFactor(workFactor: Double): Unit = 
executor.setWorkFactor(workFactor)
+
+  def getWorkFactor: Double = executor.getWorkFactor
+
+  def shutdown: Unit = {
     shutdownNow = true
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/4aae9ad8/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 4ab4bce..e0468ee 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -44,7 +44,7 @@ import org.apache.samza.container.disk.DiskSpaceMonitor
 import org.apache.samza.container.disk.DiskSpaceMonitor.Listener
 import org.apache.samza.container.disk.NoThrottlingDiskQuotaPolicyFactory
 import org.apache.samza.container.disk.PollingScanDiskSpaceMonitor
-import org.apache.samza.container.host.{SystemMemoryStatistics, 
SystemStatisticsMonitor, StatisticsMonitorImpl}
+import org.apache.samza.container.host.{StatisticsMonitorImpl, 
SystemMemoryStatistics, SystemStatisticsMonitor}
 import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory
 import org.apache.samza.job.model.ContainerModel
 import org.apache.samza.job.model.JobModel
@@ -74,10 +74,7 @@ import org.apache.samza.task.AsyncStreamTask
 import org.apache.samza.task.AsyncStreamTaskAdapter
 import org.apache.samza.task.StreamTask
 import org.apache.samza.task.TaskInstanceCollector
-import org.apache.samza.util.ExponentialSleepStrategy
-import org.apache.samza.util.Logging
-import org.apache.samza.util.ThrottlingExecutor
-import org.apache.samza.util.Util
+import org.apache.samza.util.{ExponentialSleepStrategy, Logging, Throttleable, 
Util}
 
 import scala.collection.JavaConversions._
 
@@ -554,8 +551,15 @@ object SamzaContainer extends Logging {
       (taskName, taskInstance)
     }).toMap
 
-    val executor = new ThrottlingExecutor(
-      config.getLong("container.disk.quota.delay.max.ms", 
TimeUnit.SECONDS.toMillis(1)))
+    val maxThrottlingDelayMs = 
config.getLong("container.disk.quota.delay.max.ms", 
TimeUnit.SECONDS.toMillis(1))
+
+    val runLoop = RunLoopFactory.createRunLoop(
+      taskInstances,
+      consumerMultiplexer,
+      taskThreadPool,
+      maxThrottlingDelayMs,
+      samzaContainerMetrics,
+      config)
 
     val memoryStatisticsMonitor : SystemStatisticsMonitor = new 
StatisticsMonitorImpl()
     memoryStatisticsMonitor.registerListener(new 
SystemStatisticsMonitor.Listener {
@@ -582,8 +586,8 @@ object SamzaContainer extends Logging {
       diskSpaceMonitor.registerListener(new Listener {
         override def onUpdate(diskUsageBytes: Long): Unit = {
           val newWorkRate = diskQuotaPolicy.apply(1.0 - 
(diskUsageBytes.toDouble / diskQuotaBytes))
-          executor.setWorkFactor(newWorkRate)
-          samzaContainerMetrics.executorWorkFactor.set(executor.getWorkFactor)
+          runLoop.asInstanceOf[Throttleable].setWorkFactor(newWorkRate)
+          
samzaContainerMetrics.executorWorkFactor.set(runLoop.asInstanceOf[Throttleable].getWorkFactor)
           samzaContainerMetrics.diskUsageBytes.set(diskUsageBytes)
         }
       })
@@ -593,13 +597,6 @@ object SamzaContainer extends Logging {
       info(s"Disk quotas disabled because polling interval is not set 
($DISK_POLL_INTERVAL_KEY)")
     }
 
-    val runLoop = RunLoopFactory.createRunLoop(
-      taskInstances,
-      consumerMultiplexer,
-      taskThreadPool,
-      executor,
-      samzaContainerMetrics,
-      config)
 
     info("Samza container setup complete.")
 
@@ -862,7 +859,6 @@ class SamzaContainer(
     offsetManager.stop
   }
 
-
   def shutdownMetrics {
     info("Shutting down metrics reporters.")
 
@@ -896,6 +892,4 @@ class SamzaContainer(
       hostStatisticsMonitor.stop()
     }
   }
-
-
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/4aae9ad8/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java 
b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
index 3263e54..6000ffa 100644
--- a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
+++ b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
@@ -35,7 +35,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.samza.Partition;
 import org.apache.samza.checkpoint.OffsetManager;
 import org.apache.samza.config.Config;
-import org.apache.samza.config.MapConfig;
 import org.apache.samza.container.SamzaContainerContext;
 import org.apache.samza.container.SamzaContainerMetrics;
 import org.apache.samza.container.TaskInstance;
@@ -63,7 +62,6 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class TestAsyncRunLoop {
-
   Map<TaskName, TaskInstance<AsyncStreamTask>> tasks;
   ExecutorService executor;
   SystemConsumers consumerMultiplexer;
@@ -72,6 +70,7 @@ public class TestAsyncRunLoop {
   long windowMs;
   long commitMs;
   long callbackTimeoutMs;
+  long maxThrottlingDelayMs;
   int maxMessagesInFlight;
   TaskCoordinator.RequestScope commitRequest;
   TaskCoordinator.RequestScope shutdownRequest;
@@ -101,6 +100,7 @@ public class TestAsyncRunLoop {
         windowMs,
         commitMs,
         callbackTimeoutMs,
+        maxThrottlingDelayMs,
         containerMetrics);
   }
 
@@ -116,7 +116,6 @@ public class TestAsyncRunLoop {
     return createTaskInstance(task, taskName, ssp, offsetManager, 
consumerMultiplexer);
   }
 
-
   ExecutorService callbackExecutor;
   void triggerCallback(final TestTask task, final TaskCallback callback, final 
boolean success) {
     callbackExecutor.submit(new Runnable() {
@@ -141,7 +140,6 @@ public class TestAsyncRunLoop {
     void run(TaskCallback callback);
   }
 
-
   class TestTask implements AsyncStreamTask, WindowableTask, 
EndOfStreamListenerTask {
     boolean shutdown = false;
     boolean commit = false;
@@ -193,7 +191,6 @@ public class TestAsyncRunLoop {
     }
   }
 
-
   @Before
   public void setup() {
     executor = null;
@@ -217,7 +214,6 @@ public class TestAsyncRunLoop {
     tasks.put(taskName1, t1);
   }
 
-
   @Test
   public void testProcessMultipleTasks() throws Exception {
     AsyncRunLoop runLoop = createRunLoop();
@@ -234,7 +230,6 @@ public class TestAsyncRunLoop {
     assertEquals(2L, containerMetrics.processes().getCount());
   }
 
-
   @Test
   public void testProcessInOrder() throws Exception {
     AsyncRunLoop runLoop = createRunLoop();
@@ -251,7 +246,6 @@ public class TestAsyncRunLoop {
     assertEquals(3L, containerMetrics.processes().getCount());
   }
 
-
   private TestCode buildOutofOrderCallback() {
     final CountDownLatch latch = new CountDownLatch(1);
     return new TestCode() {
@@ -447,8 +441,6 @@ public class TestAsyncRunLoop {
     TestTask mockStreamTask1 = new TestTask(true, false, false);
     TestTask mockStreamTask2 = new TestTask(true, false, false);
 
-    Config config = new MapConfig();
-
     Partition p1 = new Partition(1);
     Partition p2 = new Partition(2);
     SystemStreamPartition ssp1 = new SystemStreamPartition("system1", 
"stream1", p1);
@@ -464,8 +456,6 @@ public class TestAsyncRunLoop {
     messageList.add(envelope3);
     sspMap.put(ssp2, messageList);
 
-
-
     SystemConsumer mockConsumer = mock(SystemConsumer.class);
     when(mockConsumer.poll((Set<SystemStreamPartition>) anyObject(), 
anyLong())).thenReturn(sspMap);
 
@@ -496,20 +486,17 @@ public class TestAsyncRunLoop {
     taskInstance2.registerConsumers();
     consumers.start();
 
-    AsyncRunLoop runLoop =     new AsyncRunLoop(tasks,
+    AsyncRunLoop runLoop = new AsyncRunLoop(tasks,
         executor,
         consumers,
         maxMessagesInFlight,
         windowMs,
         commitMs,
         callbackTimeoutMs,
+        maxThrottlingDelayMs,
         containerMetrics);
 
-
     runLoop.run();
     callbackExecutor.awaitTermination(100, TimeUnit.MILLISECONDS);
-
-
-
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/4aae9ad8/samza-core/src/test/java/org/apache/samza/util/TestThrottlingScheduler.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/util/TestThrottlingScheduler.java 
b/samza-core/src/test/java/org/apache/samza/util/TestThrottlingScheduler.java
new file mode 100644
index 0000000..c41de70
--- /dev/null
+++ 
b/samza-core/src/test/java/org/apache/samza/util/TestThrottlingScheduler.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.util;
+
+import static junit.framework.Assert.*;
+
+import org.junit.Before;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+import org.junit.Test;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class TestThrottlingScheduler {
+  private static final long MAX_NANOS = Long.MAX_VALUE;
+
+  private static final Runnable NO_OP = new Runnable() {
+    @Override
+    public void run() {
+      // Do nothing.
+    }
+  };
+
+  private HighResolutionClock clock;
+  private ScheduledExecutorService scheduledExecutorService;
+  private ThrottlingScheduler throttler;
+
+  @Before
+  public void setUp() {
+    clock = Mockito.mock(HighResolutionClock.class);
+    scheduledExecutorService = Mockito.mock(ScheduledExecutorService.class);
+    throttler = new ThrottlingScheduler(MAX_NANOS, scheduledExecutorService, 
clock);
+  }
+
+  @Test
+  public void testInitialState() {
+    ThrottlingExecutor throttler = new ThrottlingExecutor(MAX_NANOS);
+    assertEquals(0, throttler.getPendingNanos());
+    assertEquals(1.0, throttler.getWorkFactor());
+  }
+
+  @Test
+  public void testSetWorkRate() {
+    throttler.setWorkFactor(1.0);
+    assertEquals(1.0, throttler.getWorkFactor());
+
+    throttler.setWorkFactor(0.5);
+    assertEquals(0.5, throttler.getWorkFactor());
+
+    throttler.setWorkFactor(Throttleable.MIN_WORK_FACTOR);
+    assertEquals(Throttleable.MIN_WORK_FACTOR, throttler.getWorkFactor());
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testLessThan0PercentWorkRate() {
+    new ThrottlingExecutor(MAX_NANOS).setWorkFactor(-0.1);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testGreaterThan100PercentWorkRate() {
+    new ThrottlingExecutor(MAX_NANOS).setWorkFactor(1.1);
+  }
+
+  @Test
+  public void test100PercentWorkRate() throws InterruptedException {
+    throttler.schedule(NO_OP, 1000);
+
+    assertEquals(0L, throttler.getPendingNanos());
+
+    // At 100% work rate schedule should not be called
+    Mockito.verify(scheduledExecutorService, Mockito.never())
+        .schedule(Mockito.any(Runnable.class), Mockito.anyLong(), 
Mockito.any(TimeUnit.class));
+  }
+
+  @Test
+  public void test50PercentWorkRate() throws InterruptedException {
+    throttler.setWorkFactor(0.5);
+
+    final long workTimeNanos = TimeUnit.MILLISECONDS.toNanos(5);
+    setActualDelay(workTimeNanos);
+
+    throttler.schedule(NO_OP, workTimeNanos);
+
+    // Delay time is same as work time at 50% work rate
+    verifyRequestedDelay(workTimeNanos);
+    assertEquals(0L, throttler.getPendingNanos());
+  }
+
+  @Test
+  public void testMinWorkRate() throws InterruptedException {
+    final double workFactor = Throttleable.MIN_WORK_FACTOR;
+    throttler.setWorkFactor(workFactor);
+
+    // The math to work out how much to multiply work time to get expected 
delay time
+    double workToDelayFactor = (1.0 - workFactor) / workFactor;
+
+    final long workTimeNanos = TimeUnit.MILLISECONDS.toNanos(5);
+    final long delayTimeNanos = (long) (workToDelayFactor * workTimeNanos);
+
+    setActualDelay(delayTimeNanos);
+    throttler.schedule(NO_OP, workTimeNanos);
+
+    verifyRequestedDelay(delayTimeNanos);
+    assertEquals(0, throttler.getPendingNanos());
+  }
+
+  @Test
+  public void testDelayOvershoot() throws InterruptedException {
+    throttler.setWorkFactor(0.5);
+
+    final long workTimeNanos = TimeUnit.MILLISECONDS.toNanos(5);
+    final long expectedDelayNanos = workTimeNanos;
+    final long actualDelayNanos = TimeUnit.MILLISECONDS.toNanos(6);
+
+    setActualDelay(actualDelayNanos);
+
+    throttler.schedule(NO_OP, workTimeNanos);
+
+    verifyRequestedDelay(expectedDelayNanos);
+    assertEquals(expectedDelayNanos - actualDelayNanos, 
throttler.getPendingNanos());
+  }
+
+  @Test
+  public void testDelayUndershoot() throws InterruptedException {
+    throttler.setWorkFactor(0.5);
+
+    final long workTimeNanos = TimeUnit.MILLISECONDS.toNanos(5);
+    final long expectedDelayNanos = workTimeNanos;
+    final long actualDelayNanos = TimeUnit.MILLISECONDS.toNanos(4);
+
+    setActualDelay(actualDelayNanos);
+
+    throttler.schedule(NO_OP, workTimeNanos);
+
+    verifyRequestedDelay(expectedDelayNanos);
+    assertEquals(expectedDelayNanos - actualDelayNanos, 
throttler.getPendingNanos());
+  }
+
+  @Test
+  public void testClampDelayMillis() throws InterruptedException {
+    final long maxDelayMillis = 10;
+    final long maxDelayNanos = TimeUnit.MILLISECONDS.toNanos(maxDelayMillis);
+
+    ThrottlingScheduler throttler = new ThrottlingScheduler(maxDelayMillis, 
scheduledExecutorService, clock);
+    throttler.setWorkFactor(0.5);
+
+    setActualDelay(maxDelayNanos);
+
+    // Note work time exceeds maxDelayMillis
+    throttler.schedule(NO_OP, TimeUnit.MILLISECONDS.toNanos(100));
+
+    verifyRequestedDelay(maxDelayNanos);
+    assertEquals(0L, throttler.getPendingNanos());
+  }
+
+  @Test
+  public void testDecreaseWorkFactor() {
+    throttler.setWorkFactor(0.5);
+    throttler.setPendingNanos(5000);
+
+    throttler.setWorkFactor(0.3);
+    assertEquals(5000, throttler.getPendingNanos());
+  }
+
+  @Test
+  public void testOverflowOfDelayNanos() throws InterruptedException {
+    throttler.setWorkFactor(0.5);
+    throttler.setPendingNanos(Long.MAX_VALUE);
+    assertEquals(Long.MAX_VALUE, throttler.getPendingNanos());
+
+    // At a 50% work factor we'd expect work and delay to match. The function 
will try
+    // to increment the pending delay nanos, which could (but should not) 
result in overflow.
+    long workDurationNs = 5000;
+    setActualDelay(workDurationNs);
+    throttler.schedule(NO_OP, workDurationNs);
+    verifyRequestedDelay(workDurationNs);
+
+    // Expect delay nanos to be clamped during accumulation, and decreased by 
expected delay at the end.
+    assertEquals(Long.MAX_VALUE - workDurationNs, throttler.getPendingNanos());
+  }
+
+  @Test
+  public void testNegativePendingNanos() throws InterruptedException {
+    throttler.setWorkFactor(0.5);
+    throttler.setPendingNanos(-1000);
+    assertEquals(-1000, throttler.getPendingNanos());
+
+    // Note: we do not expect the delay time to be used because work time + 
pending delay is
+    // negative.
+    throttler.schedule(NO_OP, 500);
+
+    // Should not be delayed with negative pending nanos
+    Mockito.verify(scheduledExecutorService, Mockito.never())
+        .schedule(Mockito.any(Runnable.class), Mockito.anyLong(), 
Mockito.any(TimeUnit.class));
+    assertEquals(-1000 + 500, throttler.getPendingNanos());
+  }
+
+  @Test
+  public void testNegativePendingNanosGoesPositive() throws 
InterruptedException {
+    throttler.setWorkFactor(0.5);
+    long startPendingNanos = -1000;
+    throttler.setPendingNanos(startPendingNanos);
+    assertEquals(-1000, throttler.getPendingNanos());
+
+    setActualDelay(1250);
+
+    // We request a delay greater than the starting pending delay.
+    throttler.schedule(NO_OP, 1250);
+
+    verifyRequestedDelay(1250);
+
+    // Final pending delay should equal initial pending delay since we delay
+    // for the exact requested amount.
+    assertEquals(startPendingNanos, throttler.getPendingNanos());
+  }
+
+  private void setActualDelay(long actualDelayNs) {
+    Mockito.when(clock.nanoTime()).thenReturn(0L).thenReturn(actualDelayNs);
+  }
+
+  private void verifyRequestedDelay(long expectedDelayTimeNanos) throws 
InterruptedException {
+    ArgumentCaptor<Runnable> runnableCaptor = 
ArgumentCaptor.forClass(Runnable.class);
+    Mockito.verify(scheduledExecutorService)
+        .schedule(runnableCaptor.capture(), 
Mockito.eq(expectedDelayTimeNanos), Mockito.eq(TimeUnit.NANOSECONDS));
+    runnableCaptor.getValue().run();
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/4aae9ad8/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala 
b/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
index aa1a8d6..d83c7e2 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
@@ -20,6 +20,8 @@
 package org.apache.samza.container
 
 
+import java.util.concurrent.TimeUnit
+
 import org.apache.samza.Partition
 import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.metrics.SlidingTimeWindowReservoir
@@ -69,7 +71,7 @@ class TestRunLoop extends AssertionsForJUnit with 
MockitoSugar with ScalaTestMat
   def testProcessMessageFromChooser {
     val taskInstances = getMockTaskInstances
     val consumers = mock[SystemConsumers]
-    val runLoop = new RunLoop(taskInstances, consumers, new 
SamzaContainerMetrics)
+    val runLoop = new RunLoop(taskInstances, consumers, new 
SamzaContainerMetrics, TimeUnit.SECONDS.toMillis(1))
 
     
when(consumers.choose()).thenReturn(envelope0).thenReturn(envelope1).thenThrow(new
 StopRunLoop)
     intercept[StopRunLoop] { runLoop.run }
@@ -83,7 +85,7 @@ class TestRunLoop extends AssertionsForJUnit with 
MockitoSugar with ScalaTestMat
   def testNullMessageFromChooser {
     val consumers = mock[SystemConsumers]
     val map = getMockTaskInstances - taskName1 // This test only needs p0
-    val runLoop = new RunLoop(map, consumers, new SamzaContainerMetrics)
+    val runLoop = new RunLoop(map, consumers, new SamzaContainerMetrics, 
TimeUnit.SECONDS.toMillis(1))
     when(consumers.choose()).thenReturn(null).thenReturn(null).thenThrow(new 
StopRunLoop)
     intercept[StopRunLoop] { runLoop.run }
     runLoop.metrics.envelopes.getCount should equal(0L)
@@ -100,6 +102,7 @@ class TestRunLoop extends AssertionsForJUnit with 
MockitoSugar with ScalaTestMat
       taskInstances = getMockTaskInstances,
       consumerMultiplexer = consumers,
       metrics = new SamzaContainerMetrics,
+      TimeUnit.SECONDS.toMillis(1),
       windowMs = 60000, // call window once per minute
       commitMs = 30000, // call commit twice per minute
       clock = () => {
@@ -120,7 +123,8 @@ class TestRunLoop extends AssertionsForJUnit with 
MockitoSugar with ScalaTestMat
   def testCommitCurrentTaskManually {
     val taskInstances = getMockTaskInstances
     val consumers = mock[SystemConsumers]
-    val runLoop = new RunLoop(taskInstances, consumers, new 
SamzaContainerMetrics, windowMs = -1, commitMs = -1)
+    val runLoop = new RunLoop(taskInstances, consumers, new 
SamzaContainerMetrics,
+      TimeUnit.SECONDS.toMillis(1), windowMs = -1, commitMs = -1)
 
     
when(consumers.choose()).thenReturn(envelope0).thenReturn(envelope1).thenThrow(new
 StopRunLoop)
     stubProcess(taskInstances(taskName0), (envelope, coordinator) => 
coordinator.commit(RequestScope.CURRENT_TASK))
@@ -134,7 +138,8 @@ class TestRunLoop extends AssertionsForJUnit with 
MockitoSugar with ScalaTestMat
   def testCommitAllTasksManually {
     val taskInstances = getMockTaskInstances
     val consumers = mock[SystemConsumers]
-    val runLoop = new RunLoop(taskInstances, consumers, new 
SamzaContainerMetrics, windowMs = -1, commitMs = -1)
+    val runLoop = new RunLoop(taskInstances, consumers, new 
SamzaContainerMetrics,
+      TimeUnit.SECONDS.toMillis(1), windowMs = -1, commitMs = -1)
 
     when(consumers.choose()).thenReturn(envelope0).thenThrow(new StopRunLoop)
     stubProcess(taskInstances(taskName0), (envelope, coordinator) => 
coordinator.commit(RequestScope.ALL_TASKS_IN_CONTAINER))
@@ -148,7 +153,8 @@ class TestRunLoop extends AssertionsForJUnit with 
MockitoSugar with ScalaTestMat
   def testShutdownOnConsensus {
     val taskInstances = getMockTaskInstances
     val consumers = mock[SystemConsumers]
-    val runLoop = new RunLoop(taskInstances, consumers, new 
SamzaContainerMetrics, windowMs = -1, commitMs = -1)
+    val runLoop = new RunLoop(taskInstances, consumers, new 
SamzaContainerMetrics,
+      TimeUnit.SECONDS.toMillis(1), windowMs = -1, commitMs = -1)
 
     
when(consumers.choose()).thenReturn(envelope0).thenReturn(envelope0).thenReturn(envelope1)
     stubProcess(taskInstances(taskName0), (envelope, coordinator) => 
coordinator.shutdown(RequestScope.CURRENT_TASK))
@@ -163,7 +169,8 @@ class TestRunLoop extends AssertionsForJUnit with 
MockitoSugar with ScalaTestMat
   def testShutdownNow {
     val taskInstances = getMockTaskInstances
     val consumers = mock[SystemConsumers]
-    val runLoop = new RunLoop(taskInstances, consumers, new 
SamzaContainerMetrics, windowMs = -1, commitMs = -1)
+    val runLoop = new RunLoop(taskInstances, consumers, new 
SamzaContainerMetrics
+      , TimeUnit.SECONDS.toMillis(1), windowMs = -1, commitMs = -1)
 
     when(consumers.choose()).thenReturn(envelope0).thenReturn(envelope1)
     stubProcess(taskInstances(taskName0), (envelope, coordinator) => 
coordinator.shutdown(RequestScope.ALL_TASKS_IN_CONTAINER))
@@ -208,6 +215,7 @@ class TestRunLoop extends AssertionsForJUnit with 
MockitoSugar with ScalaTestMat
       taskInstances = getMockTaskInstances,
       consumerMultiplexer = consumers,
       metrics = testMetrics,
+      TimeUnit.SECONDS.toMillis(1),
       windowMs = 1L,
       commitMs = 1L,
       clock = () => {
@@ -242,6 +250,7 @@ class TestRunLoop extends AssertionsForJUnit with 
MockitoSugar with ScalaTestMat
       taskInstances = getMockTaskInstances,
       consumerMultiplexer = consumers,
       metrics = testMetrics,
+      TimeUnit.SECONDS.toMillis(1),
       commitMs = 1L,
       windowMs = 1L,
       clock = () => {
@@ -275,7 +284,7 @@ class TestRunLoop extends AssertionsForJUnit with 
MockitoSugar with ScalaTestMat
     when(ti2.systemStreamPartitions).thenReturn(Set(ssp1))
 
     val mockTaskInstances = Map(taskName0 -> ti0, taskName1 -> ti1, new 
TaskName("2") -> ti2)
-    val runLoop = new RunLoop(mockTaskInstances, null, new 
SamzaContainerMetrics)
+    val runLoop = new RunLoop(mockTaskInstances, null, new 
SamzaContainerMetrics, TimeUnit.SECONDS.toMillis(1))
     val expected = Map(ssp0 -> List(ti0), ssp1 -> List(ti1, ti2))
     assertEquals(expected, 
runLoop.getSystemStreamPartitionToTaskInstancesMapping)
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/4aae9ad8/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index cff6b96..5895037 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -19,23 +19,21 @@
 
 package org.apache.samza.container
 
-import java.net.SocketTimeoutException
+import java.lang.Thread.UncaughtExceptionHandler
 import java.util
+import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic.AtomicReference
-import org.apache.samza.storage.TaskStorageManager
-import org.mockito.invocation.InvocationOnMock
-import org.mockito.stubbing.Answer
-import org.scalatest.mock.MockitoSugar
 
-import scala.collection.JavaConversions._
 import org.apache.samza.Partition
-import org.apache.samza.config.Config
-import org.apache.samza.config.MapConfig
+import org.apache.samza.checkpoint.{Checkpoint, CheckpointManager}
+import org.apache.samza.config.{Config, MapConfig}
 import org.apache.samza.coordinator.JobModelManager
-import org.apache.samza.coordinator.server.{ServletBase, HttpServer, 
JobServlet}
+import org.apache.samza.coordinator.server.{HttpServer, JobServlet}
 import org.apache.samza.job.model.ContainerModel
 import org.apache.samza.job.model.JobModel
 import org.apache.samza.job.model.TaskModel
+import org.apache.samza.serializers._
+import org.apache.samza.storage.TaskStorageManager
 import org.apache.samza.system.IncomingMessageEnvelope
 import org.apache.samza.system.StreamMetadataCache
 import org.apache.samza.system.SystemConsumer
@@ -55,11 +53,13 @@ import org.apache.samza.task.TaskInstanceCollector
 import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin
 import org.junit.Assert._
 import org.junit.Test
-import org.scalatest.junit.AssertionsForJUnit
-import java.lang.Thread.UncaughtExceptionHandler
-import org.apache.samza.serializers._
-import org.apache.samza.checkpoint.{Checkpoint, CheckpointManager}
 import org.mockito.Mockito._
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
+import org.scalatest.junit.AssertionsForJUnit
+import org.scalatest.mock.MockitoSugar
+
+import scala.collection.JavaConversions._
 
 class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
   @Test
@@ -193,7 +193,8 @@ class TestSamzaContainer extends AssertionsForJUnit with 
MockitoSugar {
     val runLoop = new RunLoop(
       taskInstances = Map(taskName -> taskInstance),
       consumerMultiplexer = consumerMultiplexer,
-      metrics = new SamzaContainerMetrics)
+      metrics = new SamzaContainerMetrics,
+      maxThrottlingDelayMs = TimeUnit.SECONDS.toMillis(1))
     val container = new SamzaContainer(
       containerContext = containerContext,
       taskInstances = Map(taskName -> taskInstance),

Reply via email to