Repository: incubator-beam Updated Branches: refs/heads/master ea04e618e -> b0cb2e87b
[BEAM-744] UnboundedKafkaReader should return as soon as it can. Use timeout directly in nextBatch() Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/84c6649c Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/84c6649c Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/84c6649c Branch: refs/heads/master Commit: 84c6649cd63c33ca79ad43e8973dbf765e27a5d0 Parents: ea04e61 Author: Sela <ans...@paypal.com> Authored: Tue Oct 18 22:03:25 2016 +0300 Committer: Sela <ans...@paypal.com> Committed: Wed Oct 19 21:34:07 2016 +0300 ---------------------------------------------------------------------- .../java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/84c6649c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 2030789..834104e 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -756,9 +756,6 @@ public class KafkaIO { private Iterator<PartitionState> curBatch = Collections.emptyIterator(); private static final Duration KAFKA_POLL_TIMEOUT = Duration.millis(1000); - // how long to wait for new records from kafka consumer inside start() - private static final Duration START_NEW_RECORDS_POLL_TIMEOUT = Duration.standardSeconds(5); - // how long to wait for new records from kafka consumer inside advance() private static final Duration NEW_RECORDS_POLL_TIMEOUT = Duration.millis(10); // Use a separate thread to read Kafka messages. Kafka Consumer does all its work including @@ -888,12 +885,13 @@ public class KafkaIO { LOG.info("{}: Returning from consumer pool loop", this); } - private void nextBatch(Duration timeout) { + private void nextBatch() { curBatch = Collections.emptyIterator(); ConsumerRecords<byte[], byte[]> records; try { - records = availableRecordsQueue.poll(timeout.getMillis(), + // poll available records, wait (if necessary) up to the specified timeout. + records = availableRecordsQueue.poll(NEW_RECORDS_POLL_TIMEOUT.getMillis(), TimeUnit.MILLISECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -966,9 +964,7 @@ public class KafkaIO { } }, 0, OFFSET_UPDATE_INTERVAL_SECONDS, TimeUnit.SECONDS); - // Wait for longer than normal when fetching a batch to improve chances a record is available - // when start() returns. - nextBatch(START_NEW_RECORDS_POLL_TIMEOUT); + nextBatch(); return advance(); } @@ -1032,7 +1028,7 @@ public class KafkaIO { return true; } else { // -- (b) - nextBatch(NEW_RECORDS_POLL_TIMEOUT); + nextBatch(); if (!curBatch.hasNext()) { return false;