Repository: kafka
Updated Branches:
  refs/heads/trunk 38a1b6055 -> af42c3789


HOTFIX: call consumer.poll() even when no task is assigned

StreamThread should keep calling consumer.poll() even when no task is assigned. 
This is necessary to get a task.

guozhangwang

Author: Yasuhiro Matsuda <yasuh...@confluent.io>

Reviewers: Guozhang Wang

Closes #373 from ymatsuda/no_task


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/af42c378
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/af42c378
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/af42c378

Branch: refs/heads/trunk
Commit: af42c37899e8fc66590e7f3c4893f8224441f6a8
Parents: 38a1b60
Author: Yasuhiro Matsuda <yasuh...@confluent.io>
Authored: Tue Oct 27 13:57:19 2015 -0700
Committer: Guozhang Wang <wangg...@gmail.com>
Committed: Tue Oct 27 13:57:19 2015 -0700

----------------------------------------------------------------------
 .../processor/internals/StreamThread.java       | 25 +++++++++++++-------
 1 file changed, 16 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/af42c378/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index abc5c5d..0bf51d7 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -265,22 +265,29 @@ public class StreamThread extends Thread {
                     sensors.pollTimeSensor.record(endPoll - startPoll);
                 }
 
-                // try to process one record from each task
                 totalNumBuffered = 0;
-                requiresPoll = false;
 
-                for (StreamTask task : tasks.values()) {
-                    long startProcess = time.milliseconds();
+                if (!tasks.isEmpty()) {
+                    // try to process one record from each task
+                    requiresPoll = false;
 
-                    totalNumBuffered += task.process();
-                    requiresPoll = requiresPoll || task.requiresPoll();
+                    for (StreamTask task : tasks.values()) {
+                        long startProcess = time.milliseconds();
 
-                    sensors.processTimeSensor.record(time.milliseconds() - 
startProcess);
+                        totalNumBuffered += task.process();
+                        requiresPoll = requiresPoll || task.requiresPoll();
+
+                        sensors.processTimeSensor.record(time.milliseconds() - 
startProcess);
+                    }
+
+                    maybePunctuate();
+                    maybeCommit();
+                } else {
+                    // even when no task is assigned, we must poll to get a 
task.
+                    requiresPoll = true;
                 }
 
-                maybePunctuate();
                 maybeClean();
-                maybeCommit();
             }
         } catch (Exception e) {
             throw new KafkaException(e);

Reply via email to