Repository: samza Updated Branches: refs/heads/master e5f31c57c -> 2187d6bd9
SAMZA-973: Disk Quotas: clamp max delay and more accurate processing time measurement Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/2187d6bd Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/2187d6bd Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/2187d6bd Branch: refs/heads/master Commit: 2187d6bd9d942e0d95189531c4b4db23f30c042b Parents: e5f31c5 Author: Chris Pettitt <[email protected]> Authored: Wed Jul 20 17:07:32 2016 -0700 Committer: Yi Pan (Data Infrastructure) <[email protected]> Committed: Wed Jul 20 17:07:32 2016 -0700 ---------------------------------------------------------------------- .../apache/samza/util/ThrottlingExecutor.java | 15 ++++--- .../org/apache/samza/container/RunLoop.scala | 42 ++++++++++---------- .../apache/samza/container/SamzaContainer.scala | 6 ++- .../samza/util/TestThrottlingExecutor.java | 28 +++++++++++-- 4 files changed, 58 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/2187d6bd/samza-core/src/main/java/org/apache/samza/util/ThrottlingExecutor.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/util/ThrottlingExecutor.java b/samza-core/src/main/java/org/apache/samza/util/ThrottlingExecutor.java index 214cefd..afcc4c5 100644 --- a/samza-core/src/main/java/org/apache/samza/util/ThrottlingExecutor.java +++ b/samza-core/src/main/java/org/apache/samza/util/ThrottlingExecutor.java @@ -20,6 +20,7 @@ package org.apache.samza.util; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; /** * An object that performs work on the current thread and optionally slows the rate of execution. @@ -33,16 +34,18 @@ public class ThrottlingExecutor implements Executor { public static final double MAX_WORK_FACTOR = 1.0; public static final double MIN_WORK_FACTOR = 0.001; + private final long maxDelayNanos; private final HighResolutionClock clock; private volatile double workToIdleFactor; private long pendingNanos; - public ThrottlingExecutor() { - this(new SystemHighResolutionClock()); + public ThrottlingExecutor(long maxDelayMillis) { + this(maxDelayMillis, new SystemHighResolutionClock()); } - ThrottlingExecutor(HighResolutionClock clock) { + ThrottlingExecutor(long maxDelayMillis, HighResolutionClock clock) { + this.maxDelayNanos = TimeUnit.MILLISECONDS.toNanos(maxDelayMillis); this.clock = clock; } @@ -68,8 +71,10 @@ public class ThrottlingExecutor implements Executor { final long workNanos = clock.nanoTime() - startWorkNanos; // NOTE: we accumulate pending delay nanos here, but we later update the pending nanos during - // the sleep operation (if applicable), so they do not continue to grow. - pendingNanos = Util.clampAdd(pendingNanos, (long) (workNanos * currentWorkToIdleFactor)); + // the sleep operation (if applicable), so they do not continue to grow. We also clamp the + // maximum sleep time to prevent excessively large sleeps between executions. + pendingNanos = Math.min(maxDelayNanos, + Util.clampAdd(pendingNanos, (long) (workNanos * currentWorkToIdleFactor))); if (pendingNanos > 0) { try { pendingNanos = clock.sleep(pendingNanos); http://git-wip-us.apache.org/repos/asf/samza/blob/2187d6bd/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala index bb2c376..538ebb8 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala @@ -21,9 +21,8 @@ package org.apache.samza.container import java.util.concurrent.Executor -import org.apache.samza.system.SystemConsumers -import org.apache.samza.system.SystemStreamPartition import org.apache.samza.task.CoordinatorRequests +import org.apache.samza.system.{IncomingMessageEnvelope, SystemConsumers, SystemStreamPartition} import org.apache.samza.task.ReadableCoordinator import org.apache.samza.task.StreamTask import org.apache.samza.util.Logging @@ -74,20 +73,26 @@ class RunLoop ( * unhandled exception is thrown. */ def run { - val runTask = new Runnable() { - override def run(): Unit = { - val loopStartTime = clock() - process - window - commit - val totalNs = clock() - loopStartTime - metrics.utilization.set(activeNs.toFloat / totalNs) - activeNs = 0L + while (!shutdownNow) { + val loopStartTime = clock() + + trace("Attempting to choose a message to process.") + + // Exclude choose time from activeNs. Although it includes deserialization time, + // it most closely captures idle time. + val envelope = updateTimer(metrics.chooseNs) { + consumerMultiplexer.choose() } - } - while (!shutdownNow) { - executor.execute(runTask) + executor.execute(new Runnable() { + override def run(): Unit = process(envelope) + }) + + window + commit + val totalNs = clock() - loopStartTime + metrics.utilization.set(activeNs.toFloat / totalNs) + activeNs = 0L } } @@ -99,16 +104,9 @@ class RunLoop ( * Chooses a message from an input stream to process, and calls the * process() method on the appropriate StreamTask to handle it. */ - private def process { - trace("Attempting to choose a message to process.") + private def process(envelope: IncomingMessageEnvelope) { metrics.processes.inc - // Exclude choose time from activeNs. Although it includes deserialization time, - // it most closely captures idle time. - val envelope = updateTimer(metrics.chooseNs) { - consumerMultiplexer.choose() - } - activeNs += updateTimerAndGetDuration(metrics.processNs) ((currentTimeNs: Long) => { if (envelope != null) { val ssp = envelope.getSystemStreamPartition http://git-wip-us.apache.org/repos/asf/samza/blob/2187d6bd/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index b8600d5..90d7279 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -28,7 +28,8 @@ import java.util import java.util.concurrent.ExecutorService import java.util.concurrent.Executors import java.util.concurrent.TimeUnit - +import java.lang.Thread.UncaughtExceptionHandler +import java.net.{URL, UnknownHostException} import org.apache.samza.SamzaException import org.apache.samza.checkpoint.CheckpointManagerFactory import org.apache.samza.checkpoint.OffsetManager @@ -552,7 +553,8 @@ object SamzaContainer extends Logging { (taskName, taskInstance) }).toMap - val executor = new ThrottlingExecutor() + val executor = new ThrottlingExecutor( + config.getLong("container.disk.quota.delay.max.ms", TimeUnit.SECONDS.toMillis(1))) val diskQuotaBytes = config.getLong("container.disk.quota.bytes", Long.MaxValue) samzaContainerMetrics.diskQuotaBytes.set(diskQuotaBytes) http://git-wip-us.apache.org/repos/asf/samza/blob/2187d6bd/samza-core/src/test/java/org/apache/samza/util/TestThrottlingExecutor.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/util/TestThrottlingExecutor.java b/samza-core/src/test/java/org/apache/samza/util/TestThrottlingExecutor.java index 2659050..0276e6b 100644 --- a/samza-core/src/test/java/org/apache/samza/util/TestThrottlingExecutor.java +++ b/samza-core/src/test/java/org/apache/samza/util/TestThrottlingExecutor.java @@ -29,6 +29,8 @@ import org.junit.Test; import java.util.concurrent.TimeUnit; public class TestThrottlingExecutor { + private static final long MAX_NANOS = Long.MAX_VALUE; + private static final Runnable NO_OP = new Runnable() { @Override public void run() { @@ -42,12 +44,12 @@ public class TestThrottlingExecutor { @Before public void setUp() { clock = Mockito.mock(HighResolutionClock.class); - executor = new ThrottlingExecutor(clock); + executor = new ThrottlingExecutor(MAX_NANOS, clock); } @Test public void testInitialState() { - ThrottlingExecutor throttler = new ThrottlingExecutor(); + ThrottlingExecutor throttler = new ThrottlingExecutor(MAX_NANOS); assertEquals(0, throttler.getPendingNanos()); assertEquals(1.0, throttler.getWorkFactor()); } @@ -66,12 +68,12 @@ public class TestThrottlingExecutor { @Test(expected = IllegalArgumentException.class) public void testLessThan0PercentWorkRate() { - new ThrottlingExecutor().setWorkFactor(-0.1); + new ThrottlingExecutor(MAX_NANOS).setWorkFactor(-0.1); } @Test(expected = IllegalArgumentException.class) public void testGreaterThan100PercentWorkRate() { - new ThrottlingExecutor().setWorkFactor(1.1); + new ThrottlingExecutor(MAX_NANOS).setWorkFactor(1.1); } @Test @@ -184,6 +186,24 @@ public class TestThrottlingExecutor { } @Test + public void testClampDelayMillis() throws InterruptedException { + final long maxDelayMillis = 10; + final long maxDelayNanos = TimeUnit.MILLISECONDS.toNanos(maxDelayMillis); + + executor = new ThrottlingExecutor(maxDelayMillis, clock); + executor.setWorkFactor(0.5); + + // Note work time exceeds maxDelayMillis + setWorkTime(TimeUnit.MILLISECONDS.toNanos(100)); + setExpectedAndActualSleepTime(maxDelayNanos, maxDelayNanos); + + executor.execute(NO_OP); + + verifySleepTime(maxDelayNanos); + assertEquals(0L, executor.getPendingNanos()); + } + + @Test public void testDecreaseWorkFactor() { executor.setWorkFactor(0.5); executor.setPendingNanos(5000);
