Repository: kafka Updated Branches: refs/heads/trunk 38a1b6055 -> af42c3789
HOTFIX: call consumer.poll() even when no task is assigned StreamThread should keep calling consumer.poll() even when no task is assigned. This is necessary to get a task. guozhangwang Author: Yasuhiro Matsuda <yasuh...@confluent.io> Reviewers: Guozhang Wang Closes #373 from ymatsuda/no_task Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/af42c378 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/af42c378 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/af42c378 Branch: refs/heads/trunk Commit: af42c37899e8fc66590e7f3c4893f8224441f6a8 Parents: 38a1b60 Author: Yasuhiro Matsuda <yasuh...@confluent.io> Authored: Tue Oct 27 13:57:19 2015 -0700 Committer: Guozhang Wang <wangg...@gmail.com> Committed: Tue Oct 27 13:57:19 2015 -0700 ---------------------------------------------------------------------- .../processor/internals/StreamThread.java | 25 +++++++++++++------- 1 file changed, 16 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/af42c378/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index abc5c5d..0bf51d7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -265,22 +265,29 @@ public class StreamThread extends Thread { sensors.pollTimeSensor.record(endPoll - startPoll); } - // try to process one record from each task totalNumBuffered = 0; - requiresPoll = false; - for (StreamTask task : tasks.values()) { - long startProcess = time.milliseconds(); + if (!tasks.isEmpty()) { + // try to process one record from each task + requiresPoll = false; - totalNumBuffered += task.process(); - requiresPoll = requiresPoll || task.requiresPoll(); + for (StreamTask task : tasks.values()) { + long startProcess = time.milliseconds(); - sensors.processTimeSensor.record(time.milliseconds() - startProcess); + totalNumBuffered += task.process(); + requiresPoll = requiresPoll || task.requiresPoll(); + + sensors.processTimeSensor.record(time.milliseconds() - startProcess); + } + + maybePunctuate(); + maybeCommit(); + } else { + // even when no task is assigned, we must poll to get a task. + requiresPoll = true; } - maybePunctuate(); maybeClean(); - maybeCommit(); } } catch (Exception e) { throw new KafkaException(e);