Repository: crunch Updated Branches: refs/heads/master 901d0644d -> 628098317
CRUNCH-629: Kafka source pulling is aggressive Added some parenthesis to force proper order of operations in KafkaRecordReader. Signed-off-by: Micah Whitacre <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/62809831 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/62809831 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/62809831 Branch: refs/heads/master Commit: 6280983179e9c690af69c2bf0e296b054122d724 Parents: 901d064 Author: Brian Tieman <[email protected]> Authored: Tue Dec 13 09:01:08 2016 -0600 Committer: Micah Whitacre <[email protected]> Committed: Tue Jan 3 12:48:02 2017 -0600 ---------------------------------------------------------------------- .../apache/crunch/kafka/inputformat/KafkaRecordReader.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/62809831/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaRecordReader.java ---------------------------------------------------------------------- diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaRecordReader.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaRecordReader.java index 3ed799b..0c49c66 100644 --- a/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaRecordReader.java +++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaRecordReader.java @@ -163,11 +163,11 @@ public class KafkaRecordReader<K, V> extends RecordReader<K, V> { } private Iterator<ConsumerRecord<K, V>> getRecords() { - if (recordIterator == null || !recordIterator.hasNext()) { + if ((recordIterator == null) || !recordIterator.hasNext()) { ConsumerRecords<K, V> records = null; int numTries = 0; boolean success = false; - while(!success && numTries < maxNumberAttempts) { + while(!success && (numTries < maxNumberAttempts)) { try { records = getConsumer().poll(consumerPollTimeout); } catch (RetriableException re) { @@ -179,7 +179,7 @@ public class KafkaRecordReader<K, V> extends RecordReader<K, V> { throw re; } } - if((records == null || records.isEmpty()) && hasPendingData()){ + if(((records == null) || records.isEmpty()) && hasPendingData()){ concurrentEmptyResponses++; LOG.warn("No records retrieved but pending offsets to consume therefore polling again. Attempt {}/{}", concurrentEmptyResponses, maxConcurrentEmptyResponses); @@ -189,7 +189,7 @@ public class KafkaRecordReader<K, V> extends RecordReader<K, V> { } concurrentEmptyResponses = 0; - if(records == null || records.isEmpty()){ + if((records == null) || records.isEmpty()){ LOG.info("No records retrieved from Kafka therefore nothing to iterate over."); }else{ LOG.info("Retrieved records from Kafka to iterate over.");
