Repository: samza Updated Branches: refs/heads/master 49dac97cb -> 1c0a60bb7
SAMZA-1021 : Remove the redundent poll waiting inside AsyncRunLoop blockIfBusy Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/1c0a60bb Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/1c0a60bb Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/1c0a60bb Branch: refs/heads/master Commit: 1c0a60bb74cc9188413ed99482e910e35359822d Parents: 49dac97 Author: Xinyu Liu <xi...@linkedin.com> Authored: Thu Sep 22 11:42:13 2016 -0700 Committer: vjagadish1989 <jvenk...@linkedin.com> Committed: Thu Sep 22 11:46:51 2016 -0700 ---------------------------------------------------------------------- .../org/apache/samza/task/AsyncRunLoop.java | 27 +++++--------------- 1 file changed, 7 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/1c0a60bb/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java index a510bb0..9a21bf1 100644 --- a/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java +++ b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java @@ -201,32 +201,23 @@ public class AsyncRunLoop implements Runnable { } /** - * Block the runloop thread if all tasks are busy. Due to limitation of non-blocking for the flow control, - * we block the run loop when there are no runnable tasks, or all tasks are idle (no pending messages) while - * chooser is empty too. When a task worker finishes or window/commit completes, it will resume the runloop. + * Block the runloop thread if all tasks are busy. When a task worker finishes or window/commit completes, + * it will resume the runloop. */ private void blockIfBusy(IncomingMessageEnvelope envelope) { synchronized (latch) { while (!shutdownNow && throwable == null) { for (AsyncTaskWorker worker : taskWorkers.values()) { - if (worker.state.isReady() && (envelope != null || worker.state.hasPendingOps())) { - // should continue running since the worker state is ready and there is either new message - // or some pending operations for the worker + if (worker.state.isReady()) { + // should continue running if any worker state is ready + // consumerMultiplexer will block on polling for empty partitions so it won't cause busy loop return; } } try { log.trace("Block loop thread"); - - if (envelope == null) { - // If the envelope is null then we will wait for a poll interval, otherwise next choose() will - // return null immediately and we will have a busy loop - latch.wait(consumerMultiplexer.pollIntervalMs()); - return; - } else { - latch.wait(); - } + latch.wait(); } catch (InterruptedException e) { throw new SamzaException("Run loop is interrupted", e); } @@ -531,10 +522,6 @@ public class AsyncRunLoop implements Runnable { } } - private boolean hasPendingOps() { - return !pendingEnvelopQueue.isEmpty() || needCommit || needWindow; - } - /** * Returns the next operation by this taskWorker */ @@ -616,4 +603,4 @@ public class AsyncRunLoop implements Runnable { return pendingEnvelope.envelope; } } -} \ No newline at end of file +}