Repository: crunch Updated Branches: refs/heads/master 614364aa9 -> 41b201a9e
CRUNCH-662: Updated KafkaRecordReader to better handle errors, empty reads and appropriately retry Signed-off-by: Josh Wills <jwi...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/41b201a9 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/41b201a9 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/41b201a9 Branch: refs/heads/master Commit: 41b201a9e546f7860e09cb30f1b7cecd4507d267 Parents: 614364a Author: Bryan Baugher <bryan.baug...@cerner.com> Authored: Wed Jan 24 14:14:31 2018 -0600 Committer: Josh Wills <jwi...@apache.org> Committed: Wed Jan 24 21:42:58 2018 -0800 ---------------------------------------------------------------------- .../org/apache/crunch/kafka/KafkaUtils.java | 2 +- .../crunch/kafka/record/KafkaRecordReader.java | 106 +++++++++++-------- 2 files changed, 63 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/41b201a9/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaUtils.java ---------------------------------------------------------------------- diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaUtils.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaUtils.java index 2ed6412..2681df0 100644 --- a/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaUtils.java +++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaUtils.java @@ -69,7 +69,7 @@ public class KafkaUtils { /** * Default number of retry attempts. */ - public static final int KAFKA_RETRY_ATTEMPTS_DEFAULT = 5; + public static final int KAFKA_RETRY_ATTEMPTS_DEFAULT = 120; public static final String KAFKA_RETRY_ATTEMPTS_DEFAULT_STRING = Integer.toString(KAFKA_RETRY_ATTEMPTS_DEFAULT); /** http://git-wip-us.apache.org/repos/asf/crunch/blob/41b201a9/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaRecordReader.java ---------------------------------------------------------------------- diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaRecordReader.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaRecordReader.java index ef2ec49..754c9ce 100644 --- a/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaRecordReader.java +++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaRecordReader.java @@ -37,10 +37,8 @@ import java.util.Iterator; import java.util.Map; import java.util.Properties; -import static org.apache.crunch.kafka.KafkaUtils.KAFKA_EMPTY_RETRY_ATTEMPTS_KEY; import static org.apache.crunch.kafka.KafkaUtils.KAFKA_RETRY_ATTEMPTS_DEFAULT; import static org.apache.crunch.kafka.KafkaUtils.KAFKA_RETRY_ATTEMPTS_KEY; -import static org.apache.crunch.kafka.KafkaUtils.KAFKA_RETRY_EMPTY_ATTEMPTS_DEFAULT; import static org.apache.crunch.kafka.KafkaUtils.getKafkaConnectionProperties; import static org.apache.crunch.kafka.record.KafkaInputFormat.filterConnectionProperties; import static org.apache.crunch.kafka.record.KafkaSource.CONSUMER_POLL_TIMEOUT_DEFAULT; @@ -66,8 +64,6 @@ public class KafkaRecordReader<K, V> extends RecordReader<ConsumerRecord<K, V>, private long currentOffset; private int maxNumberAttempts; private Properties kafkaConnectionProperties; - private int maxConcurrentEmptyResponses; - private int concurrentEmptyResponses; private TopicPartition topicPartition; @Override @@ -104,14 +100,12 @@ public class KafkaRecordReader<K, V> extends RecordReader<ConsumerRecord<K, V>, Configuration config = taskAttemptContext.getConfiguration(); consumerPollTimeout = config.getLong(CONSUMER_POLL_TIMEOUT_KEY, CONSUMER_POLL_TIMEOUT_DEFAULT); maxNumberAttempts = config.getInt(KAFKA_RETRY_ATTEMPTS_KEY, KAFKA_RETRY_ATTEMPTS_DEFAULT); - maxConcurrentEmptyResponses = config.getInt(KAFKA_EMPTY_RETRY_ATTEMPTS_KEY, KAFKA_RETRY_EMPTY_ATTEMPTS_DEFAULT); - concurrentEmptyResponses = 0; } @Override public boolean nextKeyValue() throws IOException, InterruptedException { if (hasPendingData()) { - recordIterator = getRecords(); + loadRecords(); record = recordIterator.hasNext() ? recordIterator.next() : null; if (record != null) { LOG.debug("nextKeyValue: Retrieved record with offset {}", record.offset()); @@ -119,20 +113,46 @@ public class KafkaRecordReader<K, V> extends RecordReader<ConsumerRecord<K, V>, currentOffset = record.offset(); LOG.debug("Current offset will be updated to be [{}]", currentOffset); if (LOG.isWarnEnabled() && (currentOffset - oldOffset > 1)) { - LOG.warn("Offset increment was larger than expected value of one, old {} new {}", oldOffset, currentOffset); + // The most likely scenario here is our start offset was deleted and an offset reset took place by the consumer + LOG.warn("Possible data loss in partition {} as offset {} was larger than expected {}", + new Object[] { topicPartition, currentOffset, oldOffset + 1}); } + + if (currentOffset >= endingOffset) { + // We had pending data but read a record beyond our end offset so don't include it and stop reading + if (LOG.isWarnEnabled()) + LOG.warn("Record offset {} is beyond our end offset {}. This could indicate data loss in partition {}", + new Object[] { currentOffset, endingOffset, topicPartition}); + record = null; + return false; + } + return true; + } else if (isPartitionEmpty()) { + // If the partition is empty but we are expecting to read data we would read no records and + // see no errors so handle that here + if (LOG.isWarnEnabled()) + LOG.warn("The partition {} is empty though we expected to read from {} to {}. This could indicate data loss", + new Object[] {topicPartition, currentOffset, endingOffset}); } else { - LOG.warn("nextKeyValue: Retrieved null record last offset was {} and ending offset is {}", currentOffset, endingOffset); + // We have pending data but we are unable to fetch any records so throw an exception and stop the job + throw new IOException("Unable to read additional data from Kafka. See logs for details. Partition " + + topicPartition + " Current Offset: " + currentOffset + " End Offset: " + endingOffset); } } record = null; return false; } + private boolean isPartitionEmpty() { + // We don't need to fetch the ending offset as well since that is looked up when running the job. If the end offset had + // changed while running we would have read that record instead of reading no records and calling this method + return getEarliestOffset() == endingOffset; + } + @Override public ConsumerRecord<K, V> getCurrentKey() throws IOException, InterruptedException { - return record == null ? null : record; + return record; } @Override @@ -149,56 +169,54 @@ public class KafkaRecordReader<K, V> extends RecordReader<ConsumerRecord<K, V>, private boolean hasPendingData() { //offset range is exclusive at the end which means the ending offset is one higher // than the actual physical last offset - - boolean hasPending = currentOffset < endingOffset - 1; - - if (concurrentEmptyResponses > maxConcurrentEmptyResponses) { - long earliest = getEarliestOffset(); - if (earliest == endingOffset) { - LOG.warn("Possible data loss for {} as earliest {} is equal to {} and greater than expected current {}.", - new Object[] { topicPartition, earliest, endingOffset, currentOffset }); - return false; - } - } - - return hasPending; + return currentOffset < endingOffset - 1; } - private Iterator<ConsumerRecord<K, V>> getRecords() { + /** + * Loads new records into the record iterator + */ + private void loadRecords() { if ((recordIterator == null) || !recordIterator.hasNext()) { ConsumerRecords<K, V> records = null; int numTries = 0; boolean success = false; - while (!success && (numTries < maxNumberAttempts)) { + while(!success && (numTries < maxNumberAttempts)) { + numTries++; try { records = getConsumer().poll(consumerPollTimeout); } catch (RetriableException re) { - numTries++; - if (numTries < maxNumberAttempts) { - LOG.warn("Error pulling messages from Kafka. Retrying with attempt {}", numTries + 1, re); - } else { - LOG.error("Error pulling messages from Kafka. Exceeded maximum number of attempts {}", maxNumberAttempts, re); - throw re; - } + LOG.warn("Error pulling messages from Kafka", re); } - if (((records == null) || records.isEmpty()) && hasPendingData()) { - concurrentEmptyResponses++; - LOG.warn("No records retrieved but pending offsets to consume therefore polling again. Attempt {}/{}", - concurrentEmptyResponses, maxConcurrentEmptyResponses); - } else { + + // There was no error but we don't have any records. This could indicate a slowness or failure in Kafka's internal + // client or that the partition has no more data + if (records != null && records.isEmpty()){ + if (LOG.isWarnEnabled()) + LOG.warn("No records retrieved from partition {} with poll timeout {} but pending offsets to consume. Current " + + "Offset: {}, End Offset: {}", new Object[] { topicPartition, consumerPollTimeout, currentOffset, + endingOffset }); + } else if (records != null && !records.isEmpty()){ success = true; } + + if (!success && numTries < maxNumberAttempts) { + LOG.info("Record fetch attempt {} / {} failed, retrying", numTries, maxNumberAttempts); + } + else if (!success) { + if (LOG.isWarnEnabled()) + LOG.warn("Record fetch attempt {} / {} failed. No more attempts left for partition {}", + new Object[] { numTries, maxNumberAttempts, topicPartition }); + } } - concurrentEmptyResponses = 0; - if ((records == null) || records.isEmpty()) { - LOG.info("No records retrieved from Kafka therefore nothing to iterate over."); - } else { - LOG.info("Retrieved records from Kafka to iterate over."); + if ((records == null) || records.isEmpty()){ + LOG.info("No records retrieved from Kafka partition {} therefore nothing to iterate over", topicPartition); + } else{ + LOG.info("Retrieved records {} from Kafka partition {} to iterate over", records.count(), topicPartition); } - return records != null ? records.iterator() : ConsumerRecords.<K, V>empty().iterator(); + + recordIterator = records != null ? records.iterator() : ConsumerRecords.<K, V>empty().iterator(); } - return recordIterator; } /**