Repository: incubator-beam
Updated Branches:
  refs/heads/master ea04e618e -> b0cb2e87b


[BEAM-744] UnboundedKafkaReader should return as soon as it can.

Use timeout directly in nextBatch()


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/84c6649c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/84c6649c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/84c6649c

Branch: refs/heads/master
Commit: 84c6649cd63c33ca79ad43e8973dbf765e27a5d0
Parents: ea04e61
Author: Sela <ans...@paypal.com>
Authored: Tue Oct 18 22:03:25 2016 +0300
Committer: Sela <ans...@paypal.com>
Committed: Wed Oct 19 21:34:07 2016 +0300

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/kafka/KafkaIO.java    | 14 +++++---------
 1 file changed, 5 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/84c6649c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 2030789..834104e 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -756,9 +756,6 @@ public class KafkaIO {
     private Iterator<PartitionState> curBatch = Collections.emptyIterator();
 
     private static final Duration KAFKA_POLL_TIMEOUT = Duration.millis(1000);
-    // how long to wait for new records from kafka consumer inside start()
-    private static final Duration START_NEW_RECORDS_POLL_TIMEOUT = 
Duration.standardSeconds(5);
-    // how long to wait for new records from kafka consumer inside advance()
     private static final Duration NEW_RECORDS_POLL_TIMEOUT = 
Duration.millis(10);
 
     // Use a separate thread to read Kafka messages. Kafka Consumer does all 
its work including
@@ -888,12 +885,13 @@ public class KafkaIO {
       LOG.info("{}: Returning from consumer pool loop", this);
     }
 
-    private void nextBatch(Duration timeout) {
+    private void nextBatch() {
       curBatch = Collections.emptyIterator();
 
       ConsumerRecords<byte[], byte[]> records;
       try {
-        records = availableRecordsQueue.poll(timeout.getMillis(),
+        // poll available records, wait (if necessary) up to the specified 
timeout.
+        records = 
availableRecordsQueue.poll(NEW_RECORDS_POLL_TIMEOUT.getMillis(),
                                              TimeUnit.MILLISECONDS);
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
@@ -966,9 +964,7 @@ public class KafkaIO {
             }
           }, 0, OFFSET_UPDATE_INTERVAL_SECONDS, TimeUnit.SECONDS);
 
-      // Wait for longer than normal when fetching a batch to improve chances 
a record is available
-      // when start() returns.
-      nextBatch(START_NEW_RECORDS_POLL_TIMEOUT);
+      nextBatch();
       return advance();
     }
 
@@ -1032,7 +1028,7 @@ public class KafkaIO {
           return true;
 
         } else { // -- (b)
-          nextBatch(NEW_RECORDS_POLL_TIMEOUT);
+          nextBatch();
 
           if (!curBatch.hasNext()) {
             return false;

Reply via email to