Repository: crunch Updated Branches: refs/heads/master 5944f81b2 -> ef8d60f2b
CRUNCH-621: Added check into hasPendingData to check if there is a large number of requests with no data to make sure there is still data there. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/ef8d60f2 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/ef8d60f2 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/ef8d60f2 Branch: refs/heads/master Commit: ef8d60f2b51e552a0d2296d2b4711008cd0b58ee Parents: 5944f81 Author: Micah Whitacre <[email protected]> Authored: Tue Sep 13 10:35:35 2016 -0500 Committer: Micah Whitacre <[email protected]> Committed: Wed Oct 19 21:12:49 2016 -0500 ---------------------------------------------------------------------- .../org/apache/crunch/kafka/KafkaUtils.java | 13 +++++ .../kafka/inputformat/KafkaRecordReader.java | 52 +++++++++++++++-- .../kafka/inputformat/KafkaRecordReaderIT.java | 61 ++++++++++++++++++++ 3 files changed, 122 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/ef8d60f2/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaUtils.java ---------------------------------------------------------------------- diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaUtils.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaUtils.java index 9065bee..f3da5e9 100644 --- a/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaUtils.java +++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaUtils.java @@ -71,6 +71,19 @@ public class KafkaUtils { public static final String KAFKA_RETRY_ATTEMPTS_DEFAULT_STRING = Integer.toString(KAFKA_RETRY_ATTEMPTS_DEFAULT); /** + * Configuration property for the number of retry attempts that will be made to Kafka in the event of getting empty + * responses. + */ + public static final String KAFKA_EMPTY_RETRY_ATTEMPTS_KEY = "org.apache.crunch.kafka.retry.empty.attempts"; + + /** + * Default number of empty retry attempts. + */ + public static final int KAFKA_RETRY_EMPTY_ATTEMPTS_DEFAULT = 10; + public static final String KAFKA_RETRY_EMPTY_ATTEMPTS_DEFAULT_STRING = + Integer.toString(KAFKA_RETRY_EMPTY_ATTEMPTS_DEFAULT); + + /** * Converts the provided {@code config} into a {@link Properties} object to connect with Kafka. * @param config the config to read properties * @return a properties instance populated with all of the values inside the provided {@code config}. http://git-wip-us.apache.org/repos/asf/crunch/blob/ef8d60f2/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaRecordReader.java ---------------------------------------------------------------------- diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaRecordReader.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaRecordReader.java index ad73217..14c8030 100644 --- a/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaRecordReader.java +++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaRecordReader.java @@ -17,7 +17,9 @@ */ package org.apache.crunch.kafka.inputformat; +import kafka.api.OffsetRequest; import org.apache.crunch.CrunchRuntimeException; +import org.apache.crunch.kafka.KafkaUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; @@ -34,11 +36,15 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Collections; import java.util.Iterator; +import java.util.Map; +import java.util.Properties; import static org.apache.crunch.kafka.KafkaSource.CONSUMER_POLL_TIMEOUT_DEFAULT; import static org.apache.crunch.kafka.KafkaSource.CONSUMER_POLL_TIMEOUT_KEY; +import static org.apache.crunch.kafka.KafkaUtils.KAFKA_EMPTY_RETRY_ATTEMPTS_KEY; import static org.apache.crunch.kafka.KafkaUtils.KAFKA_RETRY_ATTEMPTS_DEFAULT; import static org.apache.crunch.kafka.KafkaUtils.KAFKA_RETRY_ATTEMPTS_KEY; +import static org.apache.crunch.kafka.KafkaUtils.KAFKA_RETRY_EMPTY_ATTEMPTS_DEFAULT; import static org.apache.crunch.kafka.KafkaUtils.getKafkaConnectionProperties; /** @@ -59,15 +65,23 @@ public class KafkaRecordReader<K, V> extends RecordReader<K, V> { private long startingOffset; private long currentOffset; private int maxNumberAttempts; + private Properties connectionProperties; + private TopicPartition topicPartition; + private int concurrentEmptyResponses; + private int maxConcurrentEmptyResponses; @Override public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { - consumer = new KafkaConsumer<>(getKafkaConnectionProperties(taskAttemptContext.getConfiguration())); if(!(inputSplit instanceof KafkaInputSplit)){ throw new CrunchRuntimeException("InputSplit for RecordReader is not valid split type."); } KafkaInputSplit split = (KafkaInputSplit) inputSplit; - TopicPartition topicPartition = split.getTopicPartition(); + topicPartition = split.getTopicPartition(); + + connectionProperties = getKafkaConnectionProperties(taskAttemptContext.getConfiguration()); + + consumer = new KafkaConsumer<>(connectionProperties); + consumer.assign(Collections.singletonList(topicPartition)); //suggested hack to gather info without gathering data consumer.poll(0); @@ -86,6 +100,8 @@ public class KafkaRecordReader<K, V> extends RecordReader<K, V> { Configuration config = taskAttemptContext.getConfiguration(); consumerPollTimeout = config.getLong(CONSUMER_POLL_TIMEOUT_KEY, CONSUMER_POLL_TIMEOUT_DEFAULT); maxNumberAttempts = config.getInt(KAFKA_RETRY_ATTEMPTS_KEY, KAFKA_RETRY_ATTEMPTS_DEFAULT); + maxConcurrentEmptyResponses = config.getInt(KAFKA_EMPTY_RETRY_ATTEMPTS_KEY, KAFKA_RETRY_EMPTY_ATTEMPTS_DEFAULT); + concurrentEmptyResponses = 0; } @Override @@ -130,7 +146,19 @@ public class KafkaRecordReader<K, V> extends RecordReader<K, V> { private boolean hasPendingData(){ //offset range is exclusive at the end which means the ending offset is one higher // than the actual physical last offset - return currentOffset < endingOffset-1; + + boolean hasPending = currentOffset < endingOffset-1; + + if(concurrentEmptyResponses > maxConcurrentEmptyResponses){ + long earliest = getEarliestOffset(); + if(earliest == endingOffset){ + LOG.warn("Possible data loss for {} as earliest {} is equal to {} and greater than expected current {}.", + new Object[]{topicPartition, earliest, endingOffset, currentOffset}); + return false; + } + } + + return hasPending; } private Iterator<ConsumerRecord<K, V>> getRecords() { @@ -151,11 +179,14 @@ public class KafkaRecordReader<K, V> extends RecordReader<K, V> { } } if((records == null || records.isEmpty()) && hasPendingData()){ - LOG.warn("No records retrieved but pending offsets to consume therefore polling again."); + concurrentEmptyResponses++; + LOG.warn("No records retrieved but pending offsets to consume therefore polling again. Attempt {}/{}", + concurrentEmptyResponses, maxConcurrentEmptyResponses); }else{ success = true; } } + concurrentEmptyResponses = 0; if(records == null || records.isEmpty()){ LOG.info("No records retrieved from Kafka therefore nothing to iterate over."); @@ -171,6 +202,19 @@ public class KafkaRecordReader<K, V> extends RecordReader<K, V> { return consumer; } + protected long getEarliestOffset(){ + Map<TopicPartition, Long> brokerOffsets = KafkaUtils + .getBrokerOffsets(connectionProperties, OffsetRequest.EarliestTime(), topicPartition.topic()); + Long offset = brokerOffsets.get(topicPartition); + if(offset == null){ + LOG.debug("Unable to determine earliest offset for {} so returning earliest {}", topicPartition, + OffsetRequest.EarliestTime()); + return OffsetRequest.EarliestTime(); + } + LOG.debug("Earliest offset for {} is {}", topicPartition, offset); + return offset; + } + @Override public void close() throws IOException { LOG.debug("Closing the record reader."); http://git-wip-us.apache.org/repos/asf/crunch/blob/ef8d60f2/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaRecordReaderIT.java ---------------------------------------------------------------------- diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaRecordReaderIT.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaRecordReaderIT.java index 15970c1..c15b4d9 100644 --- a/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaRecordReaderIT.java +++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaRecordReaderIT.java @@ -294,6 +294,47 @@ public class KafkaRecordReaderIT { assertThat(keysRead.size(), is(keys.size())); } + @Test + public void pollEarliestEqualsEnding() throws IOException, InterruptedException { + List<String> keys = ClusterTest.writeData(ClusterTest.getProducerProperties(), topic, "batch", 10, 10); + + Map<TopicPartition, Long> startOffsets = getBrokerOffsets(consumerProps, OffsetRequest.EarliestTime(), topic); + Map<TopicPartition, Long> endOffsets = getBrokerOffsets(consumerProps, OffsetRequest.LatestTime(), topic); + + Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>(); + for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) { + Long endingOffset = endOffsets.get(entry.getKey()); + offsets.put(entry.getKey(), Pair.of(entry.getValue(), endingOffset)); + } + + KafkaInputFormat.writeOffsetsToConfiguration(offsets, config); + + Set<String> keysRead = new HashSet<>(); + //read all data from all splits + for (Map.Entry<TopicPartition, Pair<Long, Long>> partitionInfo : offsets.entrySet()) { + KafkaInputSplit split = new KafkaInputSplit(partitionInfo.getKey().topic(), partitionInfo.getKey().partition(), + partitionInfo.getValue().first(), partitionInfo.getValue().second()); + + when(consumer.poll(Matchers.anyLong())).thenReturn(ConsumerRecords.<String, String>empty()); + KafkaRecordReader<String, String> recordReader = new EarliestRecordReader<>(consumer, + partitionInfo.getValue().second()); + recordReader.initialize(split, context); + + int numRecordsFound = 0; + while (recordReader.nextKeyValue()) { + keysRead.add(recordReader.getCurrentKey()); + numRecordsFound++; + } + recordReader.close(); + + //assert that it encountered a partitions worth of data + assertThat(numRecordsFound, is(0)); + } + + //validate the same number of unique keys was read as were written. + assertThat(keysRead.size(), is(0)); + } + private static class NullAtStartKafkaRecordReader<K, V> extends KafkaRecordReader<K, V>{ @@ -342,4 +383,24 @@ public class KafkaRecordReaderIT { } } + private static class EarliestRecordReader<K,V> extends KafkaRecordReader<K, V>{ + + private final long earliest; + private final Consumer consumer; + + public EarliestRecordReader(Consumer consumer, long earliest){ + this.earliest = earliest; + this.consumer = consumer; + } + + @Override + protected Consumer<K, V> getConsumer() { + return consumer; + } + + @Override + protected long getEarliestOffset() { + return earliest; + } + } }
