Repository: crunch Updated Branches: refs/heads/master 628098317 -> 2e4729404
CRUNCH-630: set a better default for the situation where offsets are out of range. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/6cb3cb01 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/6cb3cb01 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/6cb3cb01 Branch: refs/heads/master Commit: 6cb3cb01933705ccd4bcc57eaeb1b3ca6dc2c4f9 Parents: 901d064 Author: Micah Whitacre <[email protected]> Authored: Tue Jan 3 11:39:31 2017 -0600 Committer: Micah Whitacre <[email protected]> Committed: Tue Jan 3 11:39:31 2017 -0600 ---------------------------------------------------------------------- .../org/apache/crunch/kafka/KafkaSource.java | 8 ++++ .../org/apache/crunch/kafka/KafkaSourceIT.java | 41 ++++++++++++++++++++ 2 files changed, 49 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/6cb3cb01/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaSource.java ---------------------------------------------------------------------- diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaSource.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaSource.java index ba7788b..bdcb7a9 100644 --- a/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaSource.java +++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaSource.java @@ -155,6 +155,10 @@ public class KafkaSource private static <K, V> Properties copyAndSetProperties(Properties kafkaConnectionProperties) { Properties props = new Properties(); + + //set the default to be earliest for auto reset but allow it to be overridden if appropriate. + props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.putAll(kafkaConnectionProperties); //Setting the key/value deserializer to ensure proper translation from Kafka to PType format. @@ -198,6 +202,10 @@ public class KafkaSource return new KafkaData<>(props, offsets); } + //exposed for testing purposes + FormatBundle getInputBundle() { + return inputBundle; + } /** * Basic {@link Deserializer} which simply wraps the payload as a {@link BytesWritable}. http://git-wip-us.apache.org/repos/asf/crunch/blob/6cb3cb01/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaSourceIT.java ---------------------------------------------------------------------- diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaSourceIT.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaSourceIT.java index 7f1323e..16aa767 100644 --- a/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaSourceIT.java +++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaSourceIT.java @@ -25,13 +25,16 @@ import org.apache.crunch.Pair; import org.apache.crunch.Pipeline; import org.apache.crunch.TableSource; import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.io.FormatBundle; import org.apache.crunch.io.From; import org.apache.crunch.io.To; +import org.apache.crunch.kafka.inputformat.KafkaInputFormat; import org.apache.crunch.test.TemporaryPath; import org.apache.crunch.types.avro.Avros; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.TopicPartition; import org.junit.AfterClass; import org.junit.Before; @@ -40,6 +43,7 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -83,6 +87,43 @@ public class KafkaSourceIT { } @Test + public void defaultEarliestOffsetReset() { + Map<TopicPartition, Pair<Long, Long>> offsets = Collections.emptyMap(); + + Configuration config = ClusterTest.getConf(); + + //Remove this so should revert to default. + consumerProps.remove(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG); + + KafkaSource kafkaSource = new KafkaSource(consumerProps, offsets); + + FormatBundle inputBundle = kafkaSource.getInputBundle(); + Configuration cfg = new Configuration(false); + inputBundle.configure(cfg); + Properties kafkaConnectionProperties = KafkaUtils.getKafkaConnectionProperties(cfg); + kafkaConnectionProperties = KafkaInputFormat.filterConnectionProperties(kafkaConnectionProperties); + assertThat(kafkaConnectionProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), is("earliest")); + } + + @Test + public void offsetResetOverridable() { + Map<TopicPartition, Pair<Long, Long>> offsets = Collections.emptyMap(); + + Configuration config = ClusterTest.getConf(); + + consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); + + KafkaSource kafkaSource = new KafkaSource(consumerProps, offsets); + + FormatBundle inputBundle = kafkaSource.getInputBundle(); + Configuration cfg = new Configuration(false); + inputBundle.configure(cfg); + Properties kafkaConnectionProperties = KafkaUtils.getKafkaConnectionProperties(cfg); + kafkaConnectionProperties = KafkaInputFormat.filterConnectionProperties(kafkaConnectionProperties); + assertThat(kafkaConnectionProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), is("latest")); + } + + @Test public void sourceReadData() { List<String> keys = ClusterTest.writeData(ClusterTest.getProducerProperties(), topic, "batch", 10, 10); Map<TopicPartition, Long> startOffsets = getBrokerOffsets(consumerProps, OffsetRequest.EarliestTime(), topic);
