Repository: crunch
Updated Branches:
  refs/heads/master 614364aa9 -> 41b201a9e


CRUNCH-662: Updated KafkaRecordReader to better handle errors, empty reads and 
appropriately retry

Signed-off-by: Josh Wills <jwi...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/41b201a9
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/41b201a9
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/41b201a9

Branch: refs/heads/master
Commit: 41b201a9e546f7860e09cb30f1b7cecd4507d267
Parents: 614364a
Author: Bryan Baugher <bryan.baug...@cerner.com>
Authored: Wed Jan 24 14:14:31 2018 -0600
Committer: Josh Wills <jwi...@apache.org>
Committed: Wed Jan 24 21:42:58 2018 -0800

----------------------------------------------------------------------
 .../org/apache/crunch/kafka/KafkaUtils.java     |   2 +-
 .../crunch/kafka/record/KafkaRecordReader.java  | 106 +++++++++++--------
 2 files changed, 63 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/41b201a9/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaUtils.java 
b/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaUtils.java
index 2ed6412..2681df0 100644
--- a/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaUtils.java
+++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaUtils.java
@@ -69,7 +69,7 @@ public class KafkaUtils {
   /**
    * Default number of retry attempts.
    */
-  public static final int KAFKA_RETRY_ATTEMPTS_DEFAULT = 5;
+  public static final int KAFKA_RETRY_ATTEMPTS_DEFAULT = 120;
   public static final String KAFKA_RETRY_ATTEMPTS_DEFAULT_STRING = 
Integer.toString(KAFKA_RETRY_ATTEMPTS_DEFAULT);
 
   /**

http://git-wip-us.apache.org/repos/asf/crunch/blob/41b201a9/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaRecordReader.java
----------------------------------------------------------------------
diff --git 
a/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaRecordReader.java
 
b/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaRecordReader.java
index ef2ec49..754c9ce 100644
--- 
a/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaRecordReader.java
+++ 
b/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaRecordReader.java
@@ -37,10 +37,8 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.Properties;
 
-import static 
org.apache.crunch.kafka.KafkaUtils.KAFKA_EMPTY_RETRY_ATTEMPTS_KEY;
 import static org.apache.crunch.kafka.KafkaUtils.KAFKA_RETRY_ATTEMPTS_DEFAULT;
 import static org.apache.crunch.kafka.KafkaUtils.KAFKA_RETRY_ATTEMPTS_KEY;
-import static 
org.apache.crunch.kafka.KafkaUtils.KAFKA_RETRY_EMPTY_ATTEMPTS_DEFAULT;
 import static org.apache.crunch.kafka.KafkaUtils.getKafkaConnectionProperties;
 import static 
org.apache.crunch.kafka.record.KafkaInputFormat.filterConnectionProperties;
 import static 
org.apache.crunch.kafka.record.KafkaSource.CONSUMER_POLL_TIMEOUT_DEFAULT;
@@ -66,8 +64,6 @@ public class KafkaRecordReader<K, V> extends 
RecordReader<ConsumerRecord<K, V>,
   private long currentOffset;
   private int maxNumberAttempts;
   private Properties kafkaConnectionProperties;
-  private int maxConcurrentEmptyResponses;
-  private int concurrentEmptyResponses;
   private TopicPartition topicPartition;
 
   @Override
@@ -104,14 +100,12 @@ public class KafkaRecordReader<K, V> extends 
RecordReader<ConsumerRecord<K, V>,
     Configuration config = taskAttemptContext.getConfiguration();
     consumerPollTimeout = config.getLong(CONSUMER_POLL_TIMEOUT_KEY, 
CONSUMER_POLL_TIMEOUT_DEFAULT);
     maxNumberAttempts = config.getInt(KAFKA_RETRY_ATTEMPTS_KEY, 
KAFKA_RETRY_ATTEMPTS_DEFAULT);
-    maxConcurrentEmptyResponses = 
config.getInt(KAFKA_EMPTY_RETRY_ATTEMPTS_KEY, 
KAFKA_RETRY_EMPTY_ATTEMPTS_DEFAULT);
-    concurrentEmptyResponses = 0;
   }
 
   @Override
   public boolean nextKeyValue() throws IOException, InterruptedException {
     if (hasPendingData()) {
-      recordIterator = getRecords();
+      loadRecords();
       record = recordIterator.hasNext() ? recordIterator.next() : null;
       if (record != null) {
         LOG.debug("nextKeyValue: Retrieved record with offset {}", 
record.offset());
@@ -119,20 +113,46 @@ public class KafkaRecordReader<K, V> extends 
RecordReader<ConsumerRecord<K, V>,
         currentOffset = record.offset();
         LOG.debug("Current offset will be updated to be [{}]", currentOffset);
         if (LOG.isWarnEnabled() && (currentOffset - oldOffset > 1)) {
-          LOG.warn("Offset increment was larger than expected value of one, 
old {} new {}", oldOffset, currentOffset);
+          // The most likely scenario here is our start offset was deleted and 
an offset reset took place by the consumer
+          LOG.warn("Possible data loss in partition {} as offset {} was larger 
than expected {}",
+                  new Object[] { topicPartition, currentOffset, oldOffset + 
1});
         }
+
+        if (currentOffset >= endingOffset) {
+          // We had pending data but read a record beyond our end offset so 
don't include it and stop reading
+          if (LOG.isWarnEnabled())
+            LOG.warn("Record offset {} is beyond our end offset {}. This could 
indicate data loss in partition {}",
+                  new Object[] { currentOffset, endingOffset, topicPartition});
+          record = null;
+          return false;
+        }
+
         return true;
+      } else if (isPartitionEmpty()) {
+        // If the partition is empty but we are expecting to read data we 
would read no records and
+        // see no errors so handle that here
+        if (LOG.isWarnEnabled())
+          LOG.warn("The partition {} is empty though we expected to read from 
{} to {}. This could indicate data loss",
+                  new Object[] {topicPartition, currentOffset, endingOffset});
       } else {
-        LOG.warn("nextKeyValue: Retrieved null record last offset was {} and 
ending offset is {}", currentOffset, endingOffset);
+        // We have pending data but we are unable to fetch any records so 
throw an exception and stop the job
+        throw new IOException("Unable to read additional data from Kafka. See 
logs for details. Partition " +
+                topicPartition + " Current Offset: " + currentOffset + " End 
Offset: " + endingOffset);
       }
     }
     record = null;
     return false;
   }
 
+  private boolean isPartitionEmpty() {
+    // We don't need to fetch the ending offset as well since that is looked 
up when running the job. If the end offset had
+    // changed while running we would have read that record instead of reading 
no records and calling this method
+    return getEarliestOffset() == endingOffset;
+  }
+
   @Override
   public ConsumerRecord<K, V> getCurrentKey() throws IOException, 
InterruptedException {
-    return record == null ? null : record;
+    return record;
   }
 
   @Override
@@ -149,56 +169,54 @@ public class KafkaRecordReader<K, V> extends 
RecordReader<ConsumerRecord<K, V>,
   private boolean hasPendingData() {
     //offset range is exclusive at the end which means the ending offset is 
one higher
     // than the actual physical last offset
-
-    boolean hasPending = currentOffset < endingOffset - 1;
-
-    if (concurrentEmptyResponses > maxConcurrentEmptyResponses) {
-      long earliest = getEarliestOffset();
-      if (earliest == endingOffset) {
-        LOG.warn("Possible data loss for {} as earliest {} is equal to {} and 
greater than expected current {}.",
-            new Object[] { topicPartition, earliest, endingOffset, 
currentOffset });
-        return false;
-      }
-    }
-
-    return hasPending;
+    return currentOffset < endingOffset - 1;
   }
 
-  private Iterator<ConsumerRecord<K, V>> getRecords() {
+  /**
+   * Loads new records into the record iterator
+   */
+  private void loadRecords() {
     if ((recordIterator == null) || !recordIterator.hasNext()) {
       ConsumerRecords<K, V> records = null;
       int numTries = 0;
       boolean success = false;
-      while (!success && (numTries < maxNumberAttempts)) {
+      while(!success && (numTries < maxNumberAttempts)) {
+        numTries++;
         try {
           records = getConsumer().poll(consumerPollTimeout);
         } catch (RetriableException re) {
-          numTries++;
-          if (numTries < maxNumberAttempts) {
-            LOG.warn("Error pulling messages from Kafka. Retrying with attempt 
{}", numTries + 1, re);
-          } else {
-            LOG.error("Error pulling messages from Kafka. Exceeded maximum 
number of attempts {}", maxNumberAttempts, re);
-            throw re;
-          }
+          LOG.warn("Error pulling messages from Kafka", re);
         }
-        if (((records == null) || records.isEmpty()) && hasPendingData()) {
-          concurrentEmptyResponses++;
-          LOG.warn("No records retrieved but pending offsets to consume 
therefore polling again. Attempt {}/{}",
-              concurrentEmptyResponses, maxConcurrentEmptyResponses);
-        } else {
+
+        // There was no error but we don't have any records. This could 
indicate a slowness or failure in Kafka's internal
+        // client or that the partition has no more data
+        if (records != null && records.isEmpty()){
+          if (LOG.isWarnEnabled())
+            LOG.warn("No records retrieved from partition {} with poll timeout 
{} but pending offsets to consume. Current "
+                  + "Offset: {}, End Offset: {}", new Object[] { 
topicPartition, consumerPollTimeout, currentOffset,
+                    endingOffset });
+        } else if (records != null && !records.isEmpty()){
           success = true;
         }
+
+        if (!success && numTries < maxNumberAttempts) {
+          LOG.info("Record fetch attempt {} / {} failed, retrying", numTries, 
maxNumberAttempts);
+        }
+        else if (!success) {
+          if (LOG.isWarnEnabled())
+            LOG.warn("Record fetch attempt {} / {} failed. No more attempts 
left for partition {}",
+                    new Object[] { numTries, maxNumberAttempts, topicPartition 
});
+        }
       }
-      concurrentEmptyResponses = 0;
 
-      if ((records == null) || records.isEmpty()) {
-        LOG.info("No records retrieved from Kafka therefore nothing to iterate 
over.");
-      } else {
-        LOG.info("Retrieved records from Kafka to iterate over.");
+      if ((records == null) || records.isEmpty()){
+        LOG.info("No records retrieved from Kafka partition {} therefore 
nothing to iterate over", topicPartition);
+      } else{
+        LOG.info("Retrieved records {} from Kafka partition {} to iterate 
over", records.count(), topicPartition);
       }
-      return records != null ? records.iterator() : ConsumerRecords.<K, 
V>empty().iterator();
+
+      recordIterator = records != null ? records.iterator() : 
ConsumerRecords.<K, V>empty().iterator();
     }
-    return recordIterator;
   }
 
   /**

Reply via email to