Repository: samza Updated Branches: refs/heads/master 13ff09024 -> 57fea260a
SD-1599: Improve the efficiency of the AsynRunLoop when some partitio⦠â¦ns are empty. Author: James Lent <[email protected]> Reviewers: Yi Pan <[email protected]>, Xinyu Liu <[email protected]> Closes #436 from jwlent55/SD-1599-improve-async-run-loop-efficiency and squashes the following commits: ef0e53bb [James Lent] SD-1599: Remove incorrect call to containerMetrics left by previous update. 3899216e [James Lent] SD-1599: Combine the blockIfBusy and blockIfNoWork logic inside one method and a common latch. e3837809 [James Lent] SD-1599: Mark runLoopResumedSinceLastChecked as volatile. a7c0ac4c [James Lent] SD-1599: Explicitly set the timeout value to either 'noNewMessagesTimeout' or 0. b2dd61e2 [James Lent] SD-1599: Address the first set of code inspection comments. cc34518a [James Lent] SD-1599: Improve the efficiency of the AsynRunLoop when some partitions are empty. Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/57fea260 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/57fea260 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/57fea260 Branch: refs/heads/master Commit: 57fea260a5dad24b287dad113c78bf23aa3ba8f9 Parents: 13ff090 Author: James Lent <[email protected]> Authored: Thu Mar 29 15:26:28 2018 -0700 Committer: Yi Pan (Data Infrastructure) <[email protected]> Committed: Thu Mar 29 15:26:28 2018 -0700 ---------------------------------------------------------------------- .../versioned/jobs/configuration-table.html | 12 ++++++ .../apache/samza/container/RunLoopFactory.java | 5 +++ .../org/apache/samza/task/AsyncRunLoop.java | 42 +++++++++++++++---- .../org/apache/samza/config/TaskConfig.scala | 7 ++++ .../apache/samza/system/SystemConsumers.scala | 7 +++- .../org/apache/samza/task/TestAsyncRunLoop.java | 44 +++++++++++++------- 6 files changed, 92 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/57fea260/docs/learn/documentation/versioned/jobs/configuration-table.html ---------------------------------------------------------------------- diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html index 49886ce..5c41596 100644 --- a/docs/learn/documentation/versioned/jobs/configuration-table.html +++ b/docs/learn/documentation/versioned/jobs/configuration-table.html @@ -766,6 +766,18 @@ </tr> <tr> + <td class="property" id="task-max-idle-ms">task.max.idle.ms</td> + <td class="default">10</td> + <td class="description"> + The maximum time to wait for a task worker to complete when there are no new messages to handle before resuming the main + loop and potentially polling for more messages. See <a href="#task-poll-interval-ms" class="property">task.poll.interval.ms</a> + This timeout value prevents the main loop from spinning when there is nothing for it to do. Increasing this value will reduce + the background load of the thread, but, also potentially increase message latency. It should not be set greater than the + <a href="#task-poll-interval-ms" class="property">task.poll.interval.ms</a>. + </td> + </tr> + + <tr> <td class="property" id="task-ignored-exceptions">task.ignored.exceptions</td> <td class="default"></td> <td class="description"> http://git-wip-us.apache.org/repos/asf/samza/blob/57fea260/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java b/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java index d399fd0..c9ec6b5 100644 --- a/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java +++ b/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java @@ -92,6 +92,10 @@ public class RunLoopFactory { log.info("Got callbackTimeout: {}.", callbackTimeout); + Long maxIdleMs = config.getMaxIdleMs(); + + log.info("Got maxIdleMs: {}.", maxIdleMs); + log.info("Run loop in asynchronous mode."); return new AsyncRunLoop( @@ -103,6 +107,7 @@ public class RunLoopFactory { taskCommitMs, callbackTimeout, maxThrottlingDelayMs, + maxIdleMs, containerMetrics, clock, isAsyncCommitEnabled); http://git-wip-us.apache.org/repos/asf/samza/blob/57fea260/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java index 589fbb8..f3c4655 100644 --- a/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java +++ b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java @@ -67,6 +67,7 @@ public class AsyncRunLoop implements Runnable, Throttleable { private final long windowMs; private final long commitMs; private final long callbackTimeoutMs; + private final long maxIdleMs; private final SamzaContainerMetrics containerMetrics; private final ScheduledExecutorService workerTimer; private final ScheduledExecutorService callbackTimer; @@ -75,6 +76,7 @@ public class AsyncRunLoop implements Runnable, Throttleable { private volatile Throwable throwable = null; private final HighResolutionClock clock; private final boolean isAsyncCommitEnabled; + private volatile boolean runLoopResumedSinceLastChecked; public AsyncRunLoop(Map<TaskName, TaskInstance> taskInstances, ExecutorService threadPool, @@ -84,6 +86,7 @@ public class AsyncRunLoop implements Runnable, Throttleable { long commitMs, long callbackTimeoutMs, long maxThrottlingDelayMs, + long maxIdleMs, SamzaContainerMetrics containerMetrics, HighResolutionClock clock, boolean isAsyncCommitEnabled) { @@ -95,6 +98,7 @@ public class AsyncRunLoop implements Runnable, Throttleable { this.commitMs = commitMs; this.maxConcurrency = maxConcurrency; this.callbackTimeoutMs = callbackTimeoutMs; + this.maxIdleMs = maxIdleMs; this.callbackTimer = (callbackTimeoutMs > 0) ? Executors.newSingleThreadScheduledExecutor() : null; this.callbackExecutor = new ThrottlingScheduler(maxThrottlingDelayMs); this.coordinatorRequests = new CoordinatorRequests(taskInstances.keySet()); @@ -150,21 +154,21 @@ public class AsyncRunLoop implements Runnable, Throttleable { long startNs = clock.nanoTime(); IncomingMessageEnvelope envelope = chooseEnvelope(); - long chooseNs = clock.nanoTime(); + long chooseNs = clock.nanoTime(); containerMetrics.chooseNs().update(chooseNs - startNs); - runTasks(envelope); + blockIfBusyOrNoNewWork(envelope); long blockNs = clock.nanoTime(); + containerMetrics.blockNs().update(blockNs - chooseNs); - blockIfBusy(envelope); + runTasks(envelope); long currentNs = clock.nanoTime(); - long activeNs = blockNs - chooseNs; + long activeNs = currentNs - blockNs; long totalNs = currentNs - prevNs; prevNs = currentNs; - containerMetrics.blockNs().update(currentNs - blockNs); if (totalNs != 0) { // totalNs is not 0 if timer metrics are enabled @@ -233,14 +237,35 @@ public class AsyncRunLoop implements Runnable, Throttleable { /** * Block the runloop thread if all tasks are busy. When a task worker finishes or window/commit completes, * it will resume the runloop. + * + * In addition, delay the AsyncRunLoop thread for a short time if there are no new messages to process and the run loop + * has not been resumed since the last time this code was run. This will prevent the main thread from spinning when it + * has no work to distribute. If a task worker finishes or window/commit completes before the timeout then resume + * the AsyncRunLoop thread immediately. That event may allow a task worker to start processing a message that has already + * been chosen. In any event it should only delay for a short time. It needs to periodically check for new messages. */ - private void blockIfBusy(IncomingMessageEnvelope envelope) { + private void blockIfBusyOrNoNewWork(IncomingMessageEnvelope envelope) { synchronized (latch) { + + // First check to see if we should delay the run loop for a short time. The runLoopResumedSinceLastChecked boolean + // is used to ensure we don't delay if there may already be a task ready to dequeue a previously chosen/pending + // message. It is better to occasionally make one additional loop when there is no work to do then delay the + // runloop when there is work that could be started immediately. + if ((envelope == null) && !runLoopResumedSinceLastChecked) { + try { + log.trace("Start no work wait"); + latch.wait(maxIdleMs); + log.trace("End no work wait"); + } catch (InterruptedException e) { + throw new SamzaException("Run loop is interrupted", e); + } + } + runLoopResumedSinceLastChecked = false; + + // Next check to see if we should block if all the tasks are busy. while (!shutdownNow && throwable == null) { for (AsyncTaskWorker worker : taskWorkers) { if (worker.state.isReady()) { - // should continue running if any worker state is ready - // consumerMultiplexer will block on polling for empty partitions so it won't cause busy loop return; } } @@ -265,6 +290,7 @@ public class AsyncRunLoop implements Runnable, Throttleable { } synchronized (latch) { latch.notifyAll(); + runLoopResumedSinceLastChecked = true; } } http://git-wip-us.apache.org/repos/asf/samza/blob/57fea260/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala index fe03a52..206eb8f 100644 --- a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala +++ b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala @@ -44,11 +44,13 @@ object TaskConfig { val MAX_CONCURRENCY = "task.max.concurrency" // max number of concurrent process for a AsyncStreamTask val CALLBACK_TIMEOUT_MS = "task.callback.timeout.ms" // timeout period for triggering a callback val ASYNC_COMMIT = "task.async.commit" // to enable async commit in a AsyncStreamTask + val MAX_IDLE_MS = "task.max.idle.ms" // maximum time to wait for a task worker to complete when there are no new messages to handle val DEFAULT_WINDOW_MS: Long = -1L val DEFAULT_COMMIT_MS = 60000L val DEFAULT_CALLBACK_TIMEOUT_MS: Long = -1L val DEFAULT_MAX_CONCURRENCY: Int = 1 + val DEFAULT_MAX_IDLE_MS: Long = 10 /** * Samza's container polls for more messages under two conditions. The first @@ -155,4 +157,9 @@ class TaskConfig(config: Config) extends ScalaMapConfig(config) with Logging { case Some(commitMs) => commitMs.toInt > 0 case _ => TaskConfig.DEFAULT_COMMIT_MS > 0 } + + def getMaxIdleMs: Long = getOption(TaskConfig.MAX_IDLE_MS) match { + case Some(ms) => ms.toLong + case _ => TaskConfig.DEFAULT_MAX_IDLE_MS + } } http://git-wip-us.apache.org/repos/asf/samza/blob/57fea260/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala index 3964ea3..49ab52a 100644 --- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala +++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala @@ -215,8 +215,11 @@ class SystemConsumers ( metrics.choseNull.inc - // Sleep for a while so we don't poll in a tight loop. - timeout = noNewMessagesTimeout + // Sleep for a while so we don't poll in a tight loop, but, don't do this when called from the AsyncRunLoop + // code because in that case the chooser will not get updated with a new message for an SSP until after a + // message is processed, See how updateChooser variable is used below. The AsyncRunLoop has its own way to + // block when there is no work to process. + timeout = if (updateChooser) noNewMessagesTimeout else 0 } else { val systemStreamPartition = envelopeFromChooser.getSystemStreamPartition http://git-wip-us.apache.org/repos/asf/samza/blob/57fea260/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java index 7f54614..d7132f3 100644 --- a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java +++ b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java @@ -69,6 +69,7 @@ public class TestAsyncRunLoop { private final long commitMs = -1; private final long callbackTimeoutMs = 0; private final long maxThrottlingDelayMs = 0; + private final long maxIdleMs = 10; private final Partition p0 = new Partition(0); private final Partition p1 = new Partition(1); private final TaskName taskName0 = new TaskName(p0.toString()); @@ -219,7 +220,7 @@ public class TestAsyncRunLoop { int maxMessagesInFlight = 1; AsyncRunLoop runLoop = new AsyncRunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight, windowMs, commitMs, - callbackTimeoutMs, maxThrottlingDelayMs, containerMetrics, () -> 0L, false); + callbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs, containerMetrics, () -> 0L, false); when(consumerMultiplexer.choose(false)) .thenReturn(envelope0) @@ -257,7 +258,8 @@ public class TestAsyncRunLoop { int maxMessagesInFlight = 1; AsyncRunLoop runLoop = new AsyncRunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight, windowMs, commitMs, - callbackTimeoutMs, maxThrottlingDelayMs, containerMetrics, () -> 0L, false); + callbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs, containerMetrics, + () -> 0L, false); when(consumerMultiplexer.choose(false)).thenReturn(envelope0).thenReturn(envelope1).thenReturn(null); runLoop.run(); @@ -288,7 +290,8 @@ public class TestAsyncRunLoop { int maxMessagesInFlight = 1; AsyncRunLoop runLoop = new AsyncRunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight, windowMs, commitMs, - callbackTimeoutMs, maxThrottlingDelayMs, containerMetrics, () -> 0L, false); + callbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs, containerMetrics, + () -> 0L, false); when(consumerMultiplexer.choose(false)).thenReturn(envelope0).thenReturn(envelope3).thenReturn(envelope1).thenReturn(null); runLoop.run(); @@ -345,7 +348,8 @@ public class TestAsyncRunLoop { task0.callbackHandler = buildOutofOrderCallback(task0); AsyncRunLoop runLoop = new AsyncRunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight, windowMs, commitMs, - callbackTimeoutMs, maxThrottlingDelayMs, containerMetrics, () -> 0L, false); + callbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs, containerMetrics, + () -> 0L, false); when(consumerMultiplexer.choose(false)).thenReturn(envelope0).thenReturn(envelope3).thenReturn(envelope1).thenReturn(null); runLoop.run(); @@ -374,7 +378,8 @@ public class TestAsyncRunLoop { long windowMs = 1; int maxMessagesInFlight = 1; AsyncRunLoop runLoop = new AsyncRunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight, windowMs, commitMs, - callbackTimeoutMs, maxThrottlingDelayMs, containerMetrics, () -> 0L, false); + callbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs, containerMetrics, + () -> 0L, false); when(consumerMultiplexer.choose(false)).thenReturn(null); runLoop.run(); @@ -400,7 +405,8 @@ public class TestAsyncRunLoop { int maxMessagesInFlight = 1; AsyncRunLoop runLoop = new AsyncRunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight, windowMs, commitMs, - callbackTimeoutMs, maxThrottlingDelayMs, containerMetrics, () -> 0L, false); + callbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs, containerMetrics, + () -> 0L, false); //have a null message in between to make sure task0 finishes processing and invoke the commit when(consumerMultiplexer.choose(false)).thenReturn(envelope0).thenReturn(null).thenReturn(envelope1).thenReturn(null); runLoop.run(); @@ -432,7 +438,8 @@ public class TestAsyncRunLoop { int maxMessagesInFlight = 1; AsyncRunLoop runLoop = new AsyncRunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight, windowMs, commitMs, - callbackTimeoutMs, maxThrottlingDelayMs, containerMetrics, () -> 0L, false); + callbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs, containerMetrics, + () -> 0L, false); //have a null message in between to make sure task0 finishes processing and invoke the commit when(consumerMultiplexer.choose(false)).thenReturn(envelope0).thenReturn(null).thenReturn(envelope1).thenReturn(null); runLoop.run(); @@ -467,7 +474,8 @@ public class TestAsyncRunLoop { int maxMessagesInFlight = 1; AsyncRunLoop runLoop = new AsyncRunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight, windowMs, commitMs, - callbackTimeoutMs, maxThrottlingDelayMs, containerMetrics, () -> 0L, false); + callbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs, containerMetrics, + () -> 0L, false); // consensus is reached after envelope1 is processed. when(consumerMultiplexer.choose(false)).thenReturn(envelope0).thenReturn(envelope1).thenReturn(null); runLoop.run(); @@ -500,7 +508,8 @@ public class TestAsyncRunLoop { int maxMessagesInFlight = 1; AsyncRunLoop runLoop = new AsyncRunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight, windowMs, commitMs, - callbackTimeoutMs, maxThrottlingDelayMs, containerMetrics, () -> 0L, false); + callbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs, containerMetrics, + () -> 0L, false); when(consumerMultiplexer.choose(false)) .thenReturn(envelope0) .thenReturn(envelope1) @@ -540,7 +549,8 @@ public class TestAsyncRunLoop { task0.callbackHandler = buildOutofOrderCallback(task0); AsyncRunLoop runLoop = new AsyncRunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight, windowMs, commitMs, - callbackTimeoutMs, maxThrottlingDelayMs, containerMetrics, () -> 0L, false); + callbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs, containerMetrics, + () -> 0L, false); when(consumerMultiplexer.choose(false)) .thenReturn(envelope0) .thenReturn(envelope3) @@ -581,8 +591,9 @@ public class TestAsyncRunLoop { tasks.put(taskName1, t1); int maxMessagesInFlight = 1; - AsyncRunLoop runLoop = new AsyncRunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight , windowMs, commitMs, - callbackTimeoutMs, maxThrottlingDelayMs, containerMetrics, () -> 0L, false); + AsyncRunLoop runLoop = new AsyncRunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight, windowMs, commitMs, + callbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs, containerMetrics, + () -> 0L, false); when(consumerMultiplexer.choose(false)).thenReturn(envelope0) .thenReturn(envelope1) @@ -656,7 +667,8 @@ public class TestAsyncRunLoop { int maxMessagesInFlight = 1; AsyncRunLoop runLoop = new AsyncRunLoop(tasks, executor, consumers, maxMessagesInFlight, windowMs, commitMs, - callbackTimeoutMs, maxThrottlingDelayMs, containerMetrics, () -> 0L, false); + callbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs, containerMetrics, + () -> 0L, false); runLoop.run(); } @@ -706,7 +718,8 @@ public class TestAsyncRunLoop { .thenReturn(null); AsyncRunLoop runLoop = new AsyncRunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight, windowMs, commitMs, - callbackTimeoutMs, maxThrottlingDelayMs, containerMetrics, () -> 0L, true); + callbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs, containerMetrics, + () -> 0L, false); runLoop.run(); @@ -755,7 +768,8 @@ public class TestAsyncRunLoop { .thenReturn(envelope1) .thenReturn(null); AsyncRunLoop runLoop = new AsyncRunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight, windowMs, commitMs, - callbackTimeoutMs, maxThrottlingDelayMs, containerMetrics, () -> 0L, true); + callbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs, containerMetrics, + () -> 0L, true); runLoop.run();
