Repository: samza Updated Branches: refs/heads/master 2aa9f893d -> 4aae9ad8c
SAMZA-1017 - Added disk quota based throttling to AsyncRunLoop Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/4aae9ad8 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/4aae9ad8 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/4aae9ad8 Branch: refs/heads/master Commit: 4aae9ad8ccd1c5ff9fdecb812d16c5aaf91c4b92 Parents: 2aa9f89 Author: Prateek Maheshwari <[email protected]> Authored: Wed Oct 19 12:04:52 2016 -0700 Committer: Xinyu Liu <[email protected]> Committed: Wed Oct 19 12:05:24 2016 -0700 ---------------------------------------------------------------------- .../apache/samza/container/RunLoopFactory.java | 10 +- .../disk/WatermarkDiskQuotaPolicy.java | 8 +- .../org/apache/samza/task/AsyncRunLoop.java | 70 ++++-- .../org/apache/samza/util/Throttleable.java | 48 ++++ .../apache/samza/util/ThrottlingExecutor.java | 25 +- .../apache/samza/util/ThrottlingScheduler.java | 148 +++++++++++ .../org/apache/samza/container/RunLoop.scala | 16 +- .../apache/samza/container/SamzaContainer.scala | 32 +-- .../org/apache/samza/task/TestAsyncRunLoop.java | 21 +- .../samza/util/TestThrottlingScheduler.java | 246 +++++++++++++++++++ .../apache/samza/container/TestRunLoop.scala | 23 +- .../samza/container/TestSamzaContainer.scala | 29 +-- 12 files changed, 559 insertions(+), 117 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/4aae9ad8/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java b/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java index a789d04..609a956 100644 --- a/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java +++ b/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java @@ -19,7 +19,6 @@ package org.apache.samza.container; -import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import org.apache.samza.SamzaException; import org.apache.samza.config.TaskConfig; @@ -32,8 +31,8 @@ import org.slf4j.LoggerFactory; import scala.collection.JavaConversions; import scala.runtime.AbstractFunction1; -import static org.apache.samza.util.Utils.defaultValue; import static org.apache.samza.util.Utils.defaultClock; +import static org.apache.samza.util.Utils.defaultValue; /** * Factory class to create runloop for a Samza task, based on the type @@ -49,7 +48,7 @@ public class RunLoopFactory { public static Runnable createRunLoop(scala.collection.immutable.Map<TaskName, TaskInstance<?>> taskInstances, SystemConsumers consumerMultiplexer, ExecutorService threadPool, - Executor executor, + long maxThrottlingDelayMs, SamzaContainerMetrics containerMetrics, TaskConfig config) { @@ -81,10 +80,10 @@ public class RunLoopFactory { streamTaskInstances, consumerMultiplexer, containerMetrics, + maxThrottlingDelayMs, taskWindowMs, taskCommitMs, - defaultClock(), - executor); + defaultClock()); } else { Integer taskMaxConcurrency = config.getMaxConcurrency().getOrElse(defaultValue(1)); @@ -106,6 +105,7 @@ public class RunLoopFactory { taskWindowMs, taskCommitMs, callbackTimeout, + maxThrottlingDelayMs, containerMetrics); } } http://git-wip-us.apache.org/repos/asf/samza/blob/4aae9ad8/samza-core/src/main/java/org/apache/samza/container/disk/WatermarkDiskQuotaPolicy.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/disk/WatermarkDiskQuotaPolicy.java b/samza-core/src/main/java/org/apache/samza/container/disk/WatermarkDiskQuotaPolicy.java index 21fbca2..7221318 100644 --- a/samza-core/src/main/java/org/apache/samza/container/disk/WatermarkDiskQuotaPolicy.java +++ b/samza-core/src/main/java/org/apache/samza/container/disk/WatermarkDiskQuotaPolicy.java @@ -19,7 +19,7 @@ package org.apache.samza.container.disk; -import org.apache.samza.util.ThrottlingExecutor; +import org.apache.samza.util.Throttleable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -105,7 +105,7 @@ public class WatermarkDiskQuotaPolicy implements DiskQuotaPolicy { // Validate entries double lastHighWaterMark = 1.0; - double lastWorkFactor = ThrottlingExecutor.MAX_WORK_FACTOR; + double lastWorkFactor = Throttleable.MAX_WORK_FACTOR; for (int i = 0; i < entries.size(); ++i) { final Entry entry = entries.get(i); @@ -123,10 +123,10 @@ public class WatermarkDiskQuotaPolicy implements DiskQuotaPolicy { dumpPolicyEntries(entries)); } - if (entry.getWorkFactor() < ThrottlingExecutor.MIN_WORK_FACTOR) { + if (entry.getWorkFactor() < Throttleable.MIN_WORK_FACTOR) { throw new IllegalArgumentException("Policy entry " + i + " has work factor (" + entry.getWorkFactor() + - ") < minimum work factor (" + ThrottlingExecutor.MIN_WORK_FACTOR + "):" + + ") < minimum work factor (" + Throttleable.MIN_WORK_FACTOR + "):" + dumpPolicyEntries(entries)); } http://git-wip-us.apache.org/repos/asf/samza/blob/4aae9ad8/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java index 77eceea..8fac815 100644 --- a/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java +++ b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java @@ -42,6 +42,8 @@ import org.apache.samza.container.TaskName; import org.apache.samza.system.IncomingMessageEnvelope; import org.apache.samza.system.SystemConsumers; import org.apache.samza.system.SystemStreamPartition; +import org.apache.samza.util.Throttleable; +import org.apache.samza.util.ThrottlingScheduler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.collection.JavaConversions; @@ -50,7 +52,7 @@ import scala.collection.JavaConversions; /** * The AsyncRunLoop supports multithreading execution of Samza {@link AsyncStreamTask}s. */ -public class AsyncRunLoop implements Runnable { +public class AsyncRunLoop implements Runnable, Throttleable { private static final Logger log = LoggerFactory.getLogger(AsyncRunLoop.class); private final Map<TaskName, AsyncTaskWorker> taskWorkers; @@ -67,6 +69,7 @@ public class AsyncRunLoop implements Runnable { private final SamzaContainerMetrics containerMetrics; private final ScheduledExecutorService workerTimer; private final ScheduledExecutorService callbackTimer; + private final ThrottlingScheduler callbackExecutor; private volatile boolean shutdownNow = false; private volatile Throwable throwable = null; @@ -77,6 +80,7 @@ public class AsyncRunLoop implements Runnable { long windowMs, long commitMs, long callbackTimeoutMs, + long maxThrottlingDelayMs, SamzaContainerMetrics containerMetrics) { this.threadPool = threadPool; @@ -87,6 +91,7 @@ public class AsyncRunLoop implements Runnable { this.maxConcurrency = maxConcurrency; this.callbackTimeoutMs = callbackTimeoutMs; this.callbackTimer = (callbackTimeoutMs > 0) ? Executors.newSingleThreadScheduledExecutor() : null; + this.callbackExecutor = new ThrottlingScheduler(maxThrottlingDelayMs); this.coordinatorRequests = new CoordinatorRequests(taskInstances.keySet()); this.latch = new Object(); this.workerTimer = Executors.newSingleThreadScheduledExecutor(); @@ -159,10 +164,21 @@ public class AsyncRunLoop implements Runnable { } } finally { workerTimer.shutdown(); + callbackExecutor.shutdown(); if (callbackTimer != null) callbackTimer.shutdown(); } } + @Override + public void setWorkFactor(double workFactor) { + callbackExecutor.setWorkFactor(workFactor); + } + + @Override + public double getWorkFactor() { + return callbackExecutor.getWorkFactor(); + } + public void shutdown() { shutdownNow = true; } @@ -480,30 +496,36 @@ public class AsyncRunLoop implements Runnable { * * @param callback AsyncSteamTask.processAsync callback */ @Override - public void onComplete(TaskCallback callback) { - try { - state.doneProcess(); - TaskCallbackImpl callbackImpl = (TaskCallbackImpl) callback; - containerMetrics.processNs().update(System.nanoTime() - callbackImpl.timeCreatedNs); - log.trace("Got callback complete for task {}, ssp {}", callbackImpl.taskName, callbackImpl.envelope.getSystemStreamPartition()); - - TaskCallbackImpl callbackToUpdate = callbackManager.updateCallback(callbackImpl, true); - if (callbackToUpdate != null) { - IncomingMessageEnvelope envelope = callbackToUpdate.envelope; - log.trace("Update offset for ssp {}, offset {}", envelope.getSystemStreamPartition(), envelope.getOffset()); - - // update offset - task.offsetManager().update(task.taskName(), envelope.getSystemStreamPartition(), envelope.getOffset()); - - // update coordinator - coordinatorRequests.update(callbackToUpdate.coordinator); + public void onComplete(final TaskCallback callback) { + long workNanos = System.nanoTime() - ((TaskCallbackImpl) callback).timeCreatedNs; + callbackExecutor.schedule(new Runnable() { + @Override + public void run() { + try { + state.doneProcess(); + TaskCallbackImpl callbackImpl = (TaskCallbackImpl) callback; + containerMetrics.processNs().update(System.nanoTime() - callbackImpl.timeCreatedNs); + log.trace("Got callback complete for task {}, ssp {}", callbackImpl.taskName, callbackImpl.envelope.getSystemStreamPartition()); + + TaskCallbackImpl callbackToUpdate = callbackManager.updateCallback(callbackImpl, true); + if (callbackToUpdate != null) { + IncomingMessageEnvelope envelope = callbackToUpdate.envelope; + log.trace("Update offset for ssp {}, offset {}", envelope.getSystemStreamPartition(), envelope.getOffset()); + + // update offset + task.offsetManager().update(task.taskName(), envelope.getSystemStreamPartition(), envelope.getOffset()); + + // update coordinator + coordinatorRequests.update(callbackToUpdate.coordinator); + } + } catch (Throwable t) { + log.error(t.getMessage(), t); + abort(t); + } finally { + resume(); + } } - } catch (Throwable t) { - log.error(t.getMessage(), t); - abort(t); - } finally { - resume(); - } + }, workNanos); } /** http://git-wip-us.apache.org/repos/asf/samza/blob/4aae9ad8/samza-core/src/main/java/org/apache/samza/util/Throttleable.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/util/Throttleable.java b/samza-core/src/main/java/org/apache/samza/util/Throttleable.java new file mode 100644 index 0000000..8d1d8ea --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/util/Throttleable.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.util; + +/** + * An object that performs work and optionally slows the rate of execution. By default, + * work will not be throttled. Work can be throttled by setting work factor to less than + * {@link #MAX_WORK_FACTOR}. + */ +public interface Throttleable { + double MAX_WORK_FACTOR = 1.0; + double MIN_WORK_FACTOR = 0.001; + + /** + * Sets the work factor for this object. A work factor of {@code 1.0} indicates that execution + * should proceed at full throughput. A work factor of less than {@code 1.0} will introduce + * delays into the execution to approximate the requested work factor. For example, if the + * work factor is {@code 0.7} then approximately 70% of the execution time will be spent + * executing the work while 30% will be spent idle. + * + * @param workFactor the work factor to set for this throttler. + */ + void setWorkFactor(double workFactor); + + /** + * Returns the current work factor in use. + * @see #setWorkFactor(double) + * @return the current work factor. + */ + double getWorkFactor(); +} http://git-wip-us.apache.org/repos/asf/samza/blob/4aae9ad8/samza-core/src/main/java/org/apache/samza/util/ThrottlingExecutor.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/util/ThrottlingExecutor.java b/samza-core/src/main/java/org/apache/samza/util/ThrottlingExecutor.java index afcc4c5..d1298fc 100644 --- a/samza-core/src/main/java/org/apache/samza/util/ThrottlingExecutor.java +++ b/samza-core/src/main/java/org/apache/samza/util/ThrottlingExecutor.java @@ -30,10 +30,7 @@ import java.util.concurrent.TimeUnit; * This class is *NOT* thread-safe. It is intended to be used from a single thread. However, the * work factor may be set from any thread. */ -public class ThrottlingExecutor implements Executor { - public static final double MAX_WORK_FACTOR = 1.0; - public static final double MIN_WORK_FACTOR = 0.001; - +public class ThrottlingExecutor implements Throttleable, Executor { private final long maxDelayNanos; private final HighResolutionClock clock; @@ -54,7 +51,7 @@ public class ThrottlingExecutor implements Executor { * is less than 1.0) this command may optionally insert a delay before returning to satisfy the * requested work factor. * <p> - * This method will not operate correct if used by more than one thread. + * This method will not operate correctly if used by more than one thread. * * @param command the work to execute */ @@ -85,15 +82,7 @@ public class ThrottlingExecutor implements Executor { } } - /** - * Sets the work factor for this executor. A work factor of {@code 1.0} indicates that execution - * should proceed at full throughput. A work factor of less than {@code 1.0} will introduce - * delays into the {@link #execute(Runnable)} call to approximate the requested work factor. For - * example, if the work factor is {@code 0.7} then approximately 70% of the execute call will be - * spent executing the supplied command while 30% will be spent idle. - * - * @param workFactor the work factor to set for this executor. - */ + @Override public void setWorkFactor(double workFactor) { if (workFactor < MIN_WORK_FACTOR) { throw new IllegalArgumentException("Work factor must be >= " + MIN_WORK_FACTOR); @@ -105,11 +94,7 @@ public class ThrottlingExecutor implements Executor { workToIdleFactor = (1.0 - workFactor) / workFactor; } - /** - * Returns the current work factor in use. - * @see #setWorkFactor(double) - * @return the current work factor. - */ + @Override public double getWorkFactor() { return 1.0 / (workToIdleFactor + 1.0); } @@ -137,4 +122,4 @@ public class ThrottlingExecutor implements Executor { void setPendingNanos(long pendingNanos) { this.pendingNanos = pendingNanos; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/samza/blob/4aae9ad8/samza-core/src/main/java/org/apache/samza/util/ThrottlingScheduler.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/util/ThrottlingScheduler.java b/samza-core/src/main/java/org/apache/samza/util/ThrottlingScheduler.java new file mode 100644 index 0000000..5b5780b --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/util/ThrottlingScheduler.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.util; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +/** + * An object that schedules work to be performed and optionally slows the rate of execution. + * By default work submitted with {@link #schedule(Runnable, long)}} will not be throttled. Work can + * be throttled by invoking {@link #setWorkFactor(double)}. + * <p> + * This class is thread-safe. It must be {@link #shutdown} after use. + */ +public class ThrottlingScheduler implements Throttleable { + private final long maxDelayNanos; + private final ScheduledExecutorService scheduledExecutorService; + private final HighResolutionClock clock; + + private final AtomicLong pendingNanos = new AtomicLong(); + private volatile double workToIdleFactor; + + public ThrottlingScheduler(long maxDelayMillis) { + this.maxDelayNanos = TimeUnit.MILLISECONDS.toNanos(maxDelayMillis); + this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); + this.clock = new SystemHighResolutionClock(); + } + + ThrottlingScheduler(long maxDelayMillis, ScheduledExecutorService scheduledExecutorService, + HighResolutionClock clock) { + this.maxDelayNanos = TimeUnit.MILLISECONDS.toNanos(maxDelayMillis); + this.scheduledExecutorService = scheduledExecutorService; + this.clock = clock; + } + + /** + * This method may be used to throttle asynchronous processing by delaying the work completion callback. + * <p> + * Executes the given completion callback on the current thread. If throttling is enabled (the work factor + * is less than 1.0) this method may optionally schedule the callback with a delay to satisfy the + * requested work factor. + * + * @param callback the callback to complete asynchronous work + * @param workDurationNs the duration of asynchronous work in nanoseconds + */ + public void schedule(final Runnable callback, final long workDurationNs) { + final double currentWorkToIdleFactor = workToIdleFactor; + + // If we're not throttling, do not get clock time, etc. This substantially reduces the overhead + // per invocation of this feature. + if (currentWorkToIdleFactor == 0.0) { + callback.run(); + } else { + final long delay = Math.min(maxDelayNanos, (long) (workDurationNs * currentWorkToIdleFactor)); + + // NOTE: we accumulate pending delay nanos here, but reduce the pending delay nanos after + // the delay operation (if applicable), so they do not continue to grow. + addToPendingNanos(delay); + + if (pendingNanos.get() < 0) { + callback.run(); + } else { + final long startTimeNs = clock.nanoTime(); + scheduledExecutorService.schedule(new Runnable() { + @Override + public void run() { + final long actualDelay = clock.nanoTime() - startTimeNs; + addToPendingNanos(-actualDelay); + callback.run(); + } + }, delay, TimeUnit.NANOSECONDS); + } + } + } + + private void addToPendingNanos(final long amount) { + long currentValue; + long newValue; + do { + currentValue = pendingNanos.get(); + newValue = Util.clampAdd(currentValue, amount); + } while (!pendingNanos.compareAndSet(currentValue, newValue)); + } + + @Override + public void setWorkFactor(double workFactor) { + if (workFactor < MIN_WORK_FACTOR) { + throw new IllegalArgumentException("Work factor must be >= " + MIN_WORK_FACTOR); + } + if (workFactor > MAX_WORK_FACTOR) { + throw new IllegalArgumentException("Work factor must be <= " + MAX_WORK_FACTOR); + } + + workToIdleFactor = (1.0 - workFactor) / workFactor; + } + + @Override + public double getWorkFactor() { + return 1.0 / (workToIdleFactor + 1.0); + } + + public void shutdown() { + scheduledExecutorService.shutdown(); + } + + /** + * Returns the total amount of delay (in nanoseconds) that needs to be applied to subsequent work. + * Alternatively this can be thought to capture the error between expected delay and actual + * applied delay. This accounts for variance in the precision of the delay mechanism, + * which may vary from platform to platform. + * <p> + * This is required for test purposes only. + * + * @return the total amount of delay (in nanoseconds) that needs to be applied to subsequent work. + */ + long getPendingNanos() { + return pendingNanos.get(); + } + + /** + * A convenience method for test that allows the pending delay for this executor to be set + * explicitly. + * + * @param pendingNanos the pending nanos to set. + */ + void setPendingNanos(long pendingNanos) { + this.pendingNanos.set(pendingNanos); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/4aae9ad8/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala index 538ebb8..7df7d88 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala @@ -19,14 +19,11 @@ package org.apache.samza.container -import java.util.concurrent.Executor - import org.apache.samza.task.CoordinatorRequests import org.apache.samza.system.{IncomingMessageEnvelope, SystemConsumers, SystemStreamPartition} import org.apache.samza.task.ReadableCoordinator import org.apache.samza.task.StreamTask -import org.apache.samza.util.Logging -import org.apache.samza.util.TimerUtils +import org.apache.samza.util.{Logging, Throttleable, ThrottlingExecutor, TimerUtils} import scala.collection.JavaConversions._ @@ -43,12 +40,13 @@ class RunLoop ( val taskInstances: Map[TaskName, TaskInstance[StreamTask]], val consumerMultiplexer: SystemConsumers, val metrics: SamzaContainerMetrics, + val maxThrottlingDelayMs: Long, val windowMs: Long = -1, val commitMs: Long = 60000, - val clock: () => Long = { System.nanoTime }, - val executor: Executor = new SameThreadExecutor()) extends Runnable with TimerUtils with Logging { + val clock: () => Long = { System.nanoTime }) extends Runnable with Throttleable with TimerUtils with Logging { private val metricsMsOffset = 1000000L + private val executor = new ThrottlingExecutor(maxThrottlingDelayMs) private var lastWindowNs = clock() private var lastCommitNs = clock() private var activeNs = 0L @@ -96,7 +94,11 @@ class RunLoop ( } } - def shutdown = { + def setWorkFactor(workFactor: Double): Unit = executor.setWorkFactor(workFactor) + + def getWorkFactor: Double = executor.getWorkFactor + + def shutdown: Unit = { shutdownNow = true } http://git-wip-us.apache.org/repos/asf/samza/blob/4aae9ad8/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index 4ab4bce..e0468ee 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -44,7 +44,7 @@ import org.apache.samza.container.disk.DiskSpaceMonitor import org.apache.samza.container.disk.DiskSpaceMonitor.Listener import org.apache.samza.container.disk.NoThrottlingDiskQuotaPolicyFactory import org.apache.samza.container.disk.PollingScanDiskSpaceMonitor -import org.apache.samza.container.host.{SystemMemoryStatistics, SystemStatisticsMonitor, StatisticsMonitorImpl} +import org.apache.samza.container.host.{StatisticsMonitorImpl, SystemMemoryStatistics, SystemStatisticsMonitor} import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory import org.apache.samza.job.model.ContainerModel import org.apache.samza.job.model.JobModel @@ -74,10 +74,7 @@ import org.apache.samza.task.AsyncStreamTask import org.apache.samza.task.AsyncStreamTaskAdapter import org.apache.samza.task.StreamTask import org.apache.samza.task.TaskInstanceCollector -import org.apache.samza.util.ExponentialSleepStrategy -import org.apache.samza.util.Logging -import org.apache.samza.util.ThrottlingExecutor -import org.apache.samza.util.Util +import org.apache.samza.util.{ExponentialSleepStrategy, Logging, Throttleable, Util} import scala.collection.JavaConversions._ @@ -554,8 +551,15 @@ object SamzaContainer extends Logging { (taskName, taskInstance) }).toMap - val executor = new ThrottlingExecutor( - config.getLong("container.disk.quota.delay.max.ms", TimeUnit.SECONDS.toMillis(1))) + val maxThrottlingDelayMs = config.getLong("container.disk.quota.delay.max.ms", TimeUnit.SECONDS.toMillis(1)) + + val runLoop = RunLoopFactory.createRunLoop( + taskInstances, + consumerMultiplexer, + taskThreadPool, + maxThrottlingDelayMs, + samzaContainerMetrics, + config) val memoryStatisticsMonitor : SystemStatisticsMonitor = new StatisticsMonitorImpl() memoryStatisticsMonitor.registerListener(new SystemStatisticsMonitor.Listener { @@ -582,8 +586,8 @@ object SamzaContainer extends Logging { diskSpaceMonitor.registerListener(new Listener { override def onUpdate(diskUsageBytes: Long): Unit = { val newWorkRate = diskQuotaPolicy.apply(1.0 - (diskUsageBytes.toDouble / diskQuotaBytes)) - executor.setWorkFactor(newWorkRate) - samzaContainerMetrics.executorWorkFactor.set(executor.getWorkFactor) + runLoop.asInstanceOf[Throttleable].setWorkFactor(newWorkRate) + samzaContainerMetrics.executorWorkFactor.set(runLoop.asInstanceOf[Throttleable].getWorkFactor) samzaContainerMetrics.diskUsageBytes.set(diskUsageBytes) } }) @@ -593,13 +597,6 @@ object SamzaContainer extends Logging { info(s"Disk quotas disabled because polling interval is not set ($DISK_POLL_INTERVAL_KEY)") } - val runLoop = RunLoopFactory.createRunLoop( - taskInstances, - consumerMultiplexer, - taskThreadPool, - executor, - samzaContainerMetrics, - config) info("Samza container setup complete.") @@ -862,7 +859,6 @@ class SamzaContainer( offsetManager.stop } - def shutdownMetrics { info("Shutting down metrics reporters.") @@ -896,6 +892,4 @@ class SamzaContainer( hostStatisticsMonitor.stop() } } - - } http://git-wip-us.apache.org/repos/asf/samza/blob/4aae9ad8/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java index 3263e54..6000ffa 100644 --- a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java +++ b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java @@ -35,7 +35,6 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.samza.Partition; import org.apache.samza.checkpoint.OffsetManager; import org.apache.samza.config.Config; -import org.apache.samza.config.MapConfig; import org.apache.samza.container.SamzaContainerContext; import org.apache.samza.container.SamzaContainerMetrics; import org.apache.samza.container.TaskInstance; @@ -63,7 +62,6 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class TestAsyncRunLoop { - Map<TaskName, TaskInstance<AsyncStreamTask>> tasks; ExecutorService executor; SystemConsumers consumerMultiplexer; @@ -72,6 +70,7 @@ public class TestAsyncRunLoop { long windowMs; long commitMs; long callbackTimeoutMs; + long maxThrottlingDelayMs; int maxMessagesInFlight; TaskCoordinator.RequestScope commitRequest; TaskCoordinator.RequestScope shutdownRequest; @@ -101,6 +100,7 @@ public class TestAsyncRunLoop { windowMs, commitMs, callbackTimeoutMs, + maxThrottlingDelayMs, containerMetrics); } @@ -116,7 +116,6 @@ public class TestAsyncRunLoop { return createTaskInstance(task, taskName, ssp, offsetManager, consumerMultiplexer); } - ExecutorService callbackExecutor; void triggerCallback(final TestTask task, final TaskCallback callback, final boolean success) { callbackExecutor.submit(new Runnable() { @@ -141,7 +140,6 @@ public class TestAsyncRunLoop { void run(TaskCallback callback); } - class TestTask implements AsyncStreamTask, WindowableTask, EndOfStreamListenerTask { boolean shutdown = false; boolean commit = false; @@ -193,7 +191,6 @@ public class TestAsyncRunLoop { } } - @Before public void setup() { executor = null; @@ -217,7 +214,6 @@ public class TestAsyncRunLoop { tasks.put(taskName1, t1); } - @Test public void testProcessMultipleTasks() throws Exception { AsyncRunLoop runLoop = createRunLoop(); @@ -234,7 +230,6 @@ public class TestAsyncRunLoop { assertEquals(2L, containerMetrics.processes().getCount()); } - @Test public void testProcessInOrder() throws Exception { AsyncRunLoop runLoop = createRunLoop(); @@ -251,7 +246,6 @@ public class TestAsyncRunLoop { assertEquals(3L, containerMetrics.processes().getCount()); } - private TestCode buildOutofOrderCallback() { final CountDownLatch latch = new CountDownLatch(1); return new TestCode() { @@ -447,8 +441,6 @@ public class TestAsyncRunLoop { TestTask mockStreamTask1 = new TestTask(true, false, false); TestTask mockStreamTask2 = new TestTask(true, false, false); - Config config = new MapConfig(); - Partition p1 = new Partition(1); Partition p2 = new Partition(2); SystemStreamPartition ssp1 = new SystemStreamPartition("system1", "stream1", p1); @@ -464,8 +456,6 @@ public class TestAsyncRunLoop { messageList.add(envelope3); sspMap.put(ssp2, messageList); - - SystemConsumer mockConsumer = mock(SystemConsumer.class); when(mockConsumer.poll((Set<SystemStreamPartition>) anyObject(), anyLong())).thenReturn(sspMap); @@ -496,20 +486,17 @@ public class TestAsyncRunLoop { taskInstance2.registerConsumers(); consumers.start(); - AsyncRunLoop runLoop = new AsyncRunLoop(tasks, + AsyncRunLoop runLoop = new AsyncRunLoop(tasks, executor, consumers, maxMessagesInFlight, windowMs, commitMs, callbackTimeoutMs, + maxThrottlingDelayMs, containerMetrics); - runLoop.run(); callbackExecutor.awaitTermination(100, TimeUnit.MILLISECONDS); - - - } } http://git-wip-us.apache.org/repos/asf/samza/blob/4aae9ad8/samza-core/src/test/java/org/apache/samza/util/TestThrottlingScheduler.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/util/TestThrottlingScheduler.java b/samza-core/src/test/java/org/apache/samza/util/TestThrottlingScheduler.java new file mode 100644 index 0000000..c41de70 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/util/TestThrottlingScheduler.java @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.util; + +import static junit.framework.Assert.*; + +import org.junit.Before; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + +import org.junit.Test; + +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class TestThrottlingScheduler { + private static final long MAX_NANOS = Long.MAX_VALUE; + + private static final Runnable NO_OP = new Runnable() { + @Override + public void run() { + // Do nothing. + } + }; + + private HighResolutionClock clock; + private ScheduledExecutorService scheduledExecutorService; + private ThrottlingScheduler throttler; + + @Before + public void setUp() { + clock = Mockito.mock(HighResolutionClock.class); + scheduledExecutorService = Mockito.mock(ScheduledExecutorService.class); + throttler = new ThrottlingScheduler(MAX_NANOS, scheduledExecutorService, clock); + } + + @Test + public void testInitialState() { + ThrottlingExecutor throttler = new ThrottlingExecutor(MAX_NANOS); + assertEquals(0, throttler.getPendingNanos()); + assertEquals(1.0, throttler.getWorkFactor()); + } + + @Test + public void testSetWorkRate() { + throttler.setWorkFactor(1.0); + assertEquals(1.0, throttler.getWorkFactor()); + + throttler.setWorkFactor(0.5); + assertEquals(0.5, throttler.getWorkFactor()); + + throttler.setWorkFactor(Throttleable.MIN_WORK_FACTOR); + assertEquals(Throttleable.MIN_WORK_FACTOR, throttler.getWorkFactor()); + } + + @Test(expected = IllegalArgumentException.class) + public void testLessThan0PercentWorkRate() { + new ThrottlingExecutor(MAX_NANOS).setWorkFactor(-0.1); + } + + @Test(expected = IllegalArgumentException.class) + public void testGreaterThan100PercentWorkRate() { + new ThrottlingExecutor(MAX_NANOS).setWorkFactor(1.1); + } + + @Test + public void test100PercentWorkRate() throws InterruptedException { + throttler.schedule(NO_OP, 1000); + + assertEquals(0L, throttler.getPendingNanos()); + + // At 100% work rate schedule should not be called + Mockito.verify(scheduledExecutorService, Mockito.never()) + .schedule(Mockito.any(Runnable.class), Mockito.anyLong(), Mockito.any(TimeUnit.class)); + } + + @Test + public void test50PercentWorkRate() throws InterruptedException { + throttler.setWorkFactor(0.5); + + final long workTimeNanos = TimeUnit.MILLISECONDS.toNanos(5); + setActualDelay(workTimeNanos); + + throttler.schedule(NO_OP, workTimeNanos); + + // Delay time is same as work time at 50% work rate + verifyRequestedDelay(workTimeNanos); + assertEquals(0L, throttler.getPendingNanos()); + } + + @Test + public void testMinWorkRate() throws InterruptedException { + final double workFactor = Throttleable.MIN_WORK_FACTOR; + throttler.setWorkFactor(workFactor); + + // The math to work out how much to multiply work time to get expected delay time + double workToDelayFactor = (1.0 - workFactor) / workFactor; + + final long workTimeNanos = TimeUnit.MILLISECONDS.toNanos(5); + final long delayTimeNanos = (long) (workToDelayFactor * workTimeNanos); + + setActualDelay(delayTimeNanos); + throttler.schedule(NO_OP, workTimeNanos); + + verifyRequestedDelay(delayTimeNanos); + assertEquals(0, throttler.getPendingNanos()); + } + + @Test + public void testDelayOvershoot() throws InterruptedException { + throttler.setWorkFactor(0.5); + + final long workTimeNanos = TimeUnit.MILLISECONDS.toNanos(5); + final long expectedDelayNanos = workTimeNanos; + final long actualDelayNanos = TimeUnit.MILLISECONDS.toNanos(6); + + setActualDelay(actualDelayNanos); + + throttler.schedule(NO_OP, workTimeNanos); + + verifyRequestedDelay(expectedDelayNanos); + assertEquals(expectedDelayNanos - actualDelayNanos, throttler.getPendingNanos()); + } + + @Test + public void testDelayUndershoot() throws InterruptedException { + throttler.setWorkFactor(0.5); + + final long workTimeNanos = TimeUnit.MILLISECONDS.toNanos(5); + final long expectedDelayNanos = workTimeNanos; + final long actualDelayNanos = TimeUnit.MILLISECONDS.toNanos(4); + + setActualDelay(actualDelayNanos); + + throttler.schedule(NO_OP, workTimeNanos); + + verifyRequestedDelay(expectedDelayNanos); + assertEquals(expectedDelayNanos - actualDelayNanos, throttler.getPendingNanos()); + } + + @Test + public void testClampDelayMillis() throws InterruptedException { + final long maxDelayMillis = 10; + final long maxDelayNanos = TimeUnit.MILLISECONDS.toNanos(maxDelayMillis); + + ThrottlingScheduler throttler = new ThrottlingScheduler(maxDelayMillis, scheduledExecutorService, clock); + throttler.setWorkFactor(0.5); + + setActualDelay(maxDelayNanos); + + // Note work time exceeds maxDelayMillis + throttler.schedule(NO_OP, TimeUnit.MILLISECONDS.toNanos(100)); + + verifyRequestedDelay(maxDelayNanos); + assertEquals(0L, throttler.getPendingNanos()); + } + + @Test + public void testDecreaseWorkFactor() { + throttler.setWorkFactor(0.5); + throttler.setPendingNanos(5000); + + throttler.setWorkFactor(0.3); + assertEquals(5000, throttler.getPendingNanos()); + } + + @Test + public void testOverflowOfDelayNanos() throws InterruptedException { + throttler.setWorkFactor(0.5); + throttler.setPendingNanos(Long.MAX_VALUE); + assertEquals(Long.MAX_VALUE, throttler.getPendingNanos()); + + // At a 50% work factor we'd expect work and delay to match. The function will try + // to increment the pending delay nanos, which could (but should not) result in overflow. + long workDurationNs = 5000; + setActualDelay(workDurationNs); + throttler.schedule(NO_OP, workDurationNs); + verifyRequestedDelay(workDurationNs); + + // Expect delay nanos to be clamped during accumulation, and decreased by expected delay at the end. + assertEquals(Long.MAX_VALUE - workDurationNs, throttler.getPendingNanos()); + } + + @Test + public void testNegativePendingNanos() throws InterruptedException { + throttler.setWorkFactor(0.5); + throttler.setPendingNanos(-1000); + assertEquals(-1000, throttler.getPendingNanos()); + + // Note: we do not expect the delay time to be used because work time + pending delay is + // negative. + throttler.schedule(NO_OP, 500); + + // Should not be delayed with negative pending nanos + Mockito.verify(scheduledExecutorService, Mockito.never()) + .schedule(Mockito.any(Runnable.class), Mockito.anyLong(), Mockito.any(TimeUnit.class)); + assertEquals(-1000 + 500, throttler.getPendingNanos()); + } + + @Test + public void testNegativePendingNanosGoesPositive() throws InterruptedException { + throttler.setWorkFactor(0.5); + long startPendingNanos = -1000; + throttler.setPendingNanos(startPendingNanos); + assertEquals(-1000, throttler.getPendingNanos()); + + setActualDelay(1250); + + // We request a delay greater than the starting pending delay. + throttler.schedule(NO_OP, 1250); + + verifyRequestedDelay(1250); + + // Final pending delay should equal initial pending delay since we delay + // for the exact requested amount. + assertEquals(startPendingNanos, throttler.getPendingNanos()); + } + + private void setActualDelay(long actualDelayNs) { + Mockito.when(clock.nanoTime()).thenReturn(0L).thenReturn(actualDelayNs); + } + + private void verifyRequestedDelay(long expectedDelayTimeNanos) throws InterruptedException { + ArgumentCaptor<Runnable> runnableCaptor = ArgumentCaptor.forClass(Runnable.class); + Mockito.verify(scheduledExecutorService) + .schedule(runnableCaptor.capture(), Mockito.eq(expectedDelayTimeNanos), Mockito.eq(TimeUnit.NANOSECONDS)); + runnableCaptor.getValue().run(); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/4aae9ad8/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala b/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala index aa1a8d6..d83c7e2 100644 --- a/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala +++ b/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala @@ -20,6 +20,8 @@ package org.apache.samza.container +import java.util.concurrent.TimeUnit + import org.apache.samza.Partition import org.apache.samza.metrics.MetricsRegistryMap import org.apache.samza.metrics.SlidingTimeWindowReservoir @@ -69,7 +71,7 @@ class TestRunLoop extends AssertionsForJUnit with MockitoSugar with ScalaTestMat def testProcessMessageFromChooser { val taskInstances = getMockTaskInstances val consumers = mock[SystemConsumers] - val runLoop = new RunLoop(taskInstances, consumers, new SamzaContainerMetrics) + val runLoop = new RunLoop(taskInstances, consumers, new SamzaContainerMetrics, TimeUnit.SECONDS.toMillis(1)) when(consumers.choose()).thenReturn(envelope0).thenReturn(envelope1).thenThrow(new StopRunLoop) intercept[StopRunLoop] { runLoop.run } @@ -83,7 +85,7 @@ class TestRunLoop extends AssertionsForJUnit with MockitoSugar with ScalaTestMat def testNullMessageFromChooser { val consumers = mock[SystemConsumers] val map = getMockTaskInstances - taskName1 // This test only needs p0 - val runLoop = new RunLoop(map, consumers, new SamzaContainerMetrics) + val runLoop = new RunLoop(map, consumers, new SamzaContainerMetrics, TimeUnit.SECONDS.toMillis(1)) when(consumers.choose()).thenReturn(null).thenReturn(null).thenThrow(new StopRunLoop) intercept[StopRunLoop] { runLoop.run } runLoop.metrics.envelopes.getCount should equal(0L) @@ -100,6 +102,7 @@ class TestRunLoop extends AssertionsForJUnit with MockitoSugar with ScalaTestMat taskInstances = getMockTaskInstances, consumerMultiplexer = consumers, metrics = new SamzaContainerMetrics, + TimeUnit.SECONDS.toMillis(1), windowMs = 60000, // call window once per minute commitMs = 30000, // call commit twice per minute clock = () => { @@ -120,7 +123,8 @@ class TestRunLoop extends AssertionsForJUnit with MockitoSugar with ScalaTestMat def testCommitCurrentTaskManually { val taskInstances = getMockTaskInstances val consumers = mock[SystemConsumers] - val runLoop = new RunLoop(taskInstances, consumers, new SamzaContainerMetrics, windowMs = -1, commitMs = -1) + val runLoop = new RunLoop(taskInstances, consumers, new SamzaContainerMetrics, + TimeUnit.SECONDS.toMillis(1), windowMs = -1, commitMs = -1) when(consumers.choose()).thenReturn(envelope0).thenReturn(envelope1).thenThrow(new StopRunLoop) stubProcess(taskInstances(taskName0), (envelope, coordinator) => coordinator.commit(RequestScope.CURRENT_TASK)) @@ -134,7 +138,8 @@ class TestRunLoop extends AssertionsForJUnit with MockitoSugar with ScalaTestMat def testCommitAllTasksManually { val taskInstances = getMockTaskInstances val consumers = mock[SystemConsumers] - val runLoop = new RunLoop(taskInstances, consumers, new SamzaContainerMetrics, windowMs = -1, commitMs = -1) + val runLoop = new RunLoop(taskInstances, consumers, new SamzaContainerMetrics, + TimeUnit.SECONDS.toMillis(1), windowMs = -1, commitMs = -1) when(consumers.choose()).thenReturn(envelope0).thenThrow(new StopRunLoop) stubProcess(taskInstances(taskName0), (envelope, coordinator) => coordinator.commit(RequestScope.ALL_TASKS_IN_CONTAINER)) @@ -148,7 +153,8 @@ class TestRunLoop extends AssertionsForJUnit with MockitoSugar with ScalaTestMat def testShutdownOnConsensus { val taskInstances = getMockTaskInstances val consumers = mock[SystemConsumers] - val runLoop = new RunLoop(taskInstances, consumers, new SamzaContainerMetrics, windowMs = -1, commitMs = -1) + val runLoop = new RunLoop(taskInstances, consumers, new SamzaContainerMetrics, + TimeUnit.SECONDS.toMillis(1), windowMs = -1, commitMs = -1) when(consumers.choose()).thenReturn(envelope0).thenReturn(envelope0).thenReturn(envelope1) stubProcess(taskInstances(taskName0), (envelope, coordinator) => coordinator.shutdown(RequestScope.CURRENT_TASK)) @@ -163,7 +169,8 @@ class TestRunLoop extends AssertionsForJUnit with MockitoSugar with ScalaTestMat def testShutdownNow { val taskInstances = getMockTaskInstances val consumers = mock[SystemConsumers] - val runLoop = new RunLoop(taskInstances, consumers, new SamzaContainerMetrics, windowMs = -1, commitMs = -1) + val runLoop = new RunLoop(taskInstances, consumers, new SamzaContainerMetrics + , TimeUnit.SECONDS.toMillis(1), windowMs = -1, commitMs = -1) when(consumers.choose()).thenReturn(envelope0).thenReturn(envelope1) stubProcess(taskInstances(taskName0), (envelope, coordinator) => coordinator.shutdown(RequestScope.ALL_TASKS_IN_CONTAINER)) @@ -208,6 +215,7 @@ class TestRunLoop extends AssertionsForJUnit with MockitoSugar with ScalaTestMat taskInstances = getMockTaskInstances, consumerMultiplexer = consumers, metrics = testMetrics, + TimeUnit.SECONDS.toMillis(1), windowMs = 1L, commitMs = 1L, clock = () => { @@ -242,6 +250,7 @@ class TestRunLoop extends AssertionsForJUnit with MockitoSugar with ScalaTestMat taskInstances = getMockTaskInstances, consumerMultiplexer = consumers, metrics = testMetrics, + TimeUnit.SECONDS.toMillis(1), commitMs = 1L, windowMs = 1L, clock = () => { @@ -275,7 +284,7 @@ class TestRunLoop extends AssertionsForJUnit with MockitoSugar with ScalaTestMat when(ti2.systemStreamPartitions).thenReturn(Set(ssp1)) val mockTaskInstances = Map(taskName0 -> ti0, taskName1 -> ti1, new TaskName("2") -> ti2) - val runLoop = new RunLoop(mockTaskInstances, null, new SamzaContainerMetrics) + val runLoop = new RunLoop(mockTaskInstances, null, new SamzaContainerMetrics, TimeUnit.SECONDS.toMillis(1)) val expected = Map(ssp0 -> List(ti0), ssp1 -> List(ti1, ti2)) assertEquals(expected, runLoop.getSystemStreamPartitionToTaskInstancesMapping) } http://git-wip-us.apache.org/repos/asf/samza/blob/4aae9ad8/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala index cff6b96..5895037 100644 --- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala +++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala @@ -19,23 +19,21 @@ package org.apache.samza.container -import java.net.SocketTimeoutException +import java.lang.Thread.UncaughtExceptionHandler import java.util +import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicReference -import org.apache.samza.storage.TaskStorageManager -import org.mockito.invocation.InvocationOnMock -import org.mockito.stubbing.Answer -import org.scalatest.mock.MockitoSugar -import scala.collection.JavaConversions._ import org.apache.samza.Partition -import org.apache.samza.config.Config -import org.apache.samza.config.MapConfig +import org.apache.samza.checkpoint.{Checkpoint, CheckpointManager} +import org.apache.samza.config.{Config, MapConfig} import org.apache.samza.coordinator.JobModelManager -import org.apache.samza.coordinator.server.{ServletBase, HttpServer, JobServlet} +import org.apache.samza.coordinator.server.{HttpServer, JobServlet} import org.apache.samza.job.model.ContainerModel import org.apache.samza.job.model.JobModel import org.apache.samza.job.model.TaskModel +import org.apache.samza.serializers._ +import org.apache.samza.storage.TaskStorageManager import org.apache.samza.system.IncomingMessageEnvelope import org.apache.samza.system.StreamMetadataCache import org.apache.samza.system.SystemConsumer @@ -55,11 +53,13 @@ import org.apache.samza.task.TaskInstanceCollector import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin import org.junit.Assert._ import org.junit.Test -import org.scalatest.junit.AssertionsForJUnit -import java.lang.Thread.UncaughtExceptionHandler -import org.apache.samza.serializers._ -import org.apache.samza.checkpoint.{Checkpoint, CheckpointManager} import org.mockito.Mockito._ +import org.mockito.invocation.InvocationOnMock +import org.mockito.stubbing.Answer +import org.scalatest.junit.AssertionsForJUnit +import org.scalatest.mock.MockitoSugar + +import scala.collection.JavaConversions._ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar { @Test @@ -193,7 +193,8 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar { val runLoop = new RunLoop( taskInstances = Map(taskName -> taskInstance), consumerMultiplexer = consumerMultiplexer, - metrics = new SamzaContainerMetrics) + metrics = new SamzaContainerMetrics, + maxThrottlingDelayMs = TimeUnit.SECONDS.toMillis(1)) val container = new SamzaContainer( containerContext = containerContext, taskInstances = Map(taskName -> taskInstance),
