Repository: crunch
Updated Branches:
  refs/heads/master 628098317 -> 2e4729404


CRUNCH-630: set a better default for the situation where offsets are out of 
range.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/6cb3cb01
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/6cb3cb01
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/6cb3cb01

Branch: refs/heads/master
Commit: 6cb3cb01933705ccd4bcc57eaeb1b3ca6dc2c4f9
Parents: 901d064
Author: Micah Whitacre <[email protected]>
Authored: Tue Jan 3 11:39:31 2017 -0600
Committer: Micah Whitacre <[email protected]>
Committed: Tue Jan 3 11:39:31 2017 -0600

----------------------------------------------------------------------
 .../org/apache/crunch/kafka/KafkaSource.java    |  8 ++++
 .../org/apache/crunch/kafka/KafkaSourceIT.java  | 41 ++++++++++++++++++++
 2 files changed, 49 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/6cb3cb01/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git 
a/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaSource.java 
b/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaSource.java
index ba7788b..bdcb7a9 100644
--- a/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaSource.java
+++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaSource.java
@@ -155,6 +155,10 @@ public class KafkaSource
 
   private static <K, V> Properties copyAndSetProperties(Properties 
kafkaConnectionProperties) {
     Properties props = new Properties();
+
+    //set the default to be earliest for auto reset but allow it to be 
overridden if appropriate.
+    props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
     props.putAll(kafkaConnectionProperties);
 
     //Setting the key/value deserializer to ensure proper translation from 
Kafka to PType format.
@@ -198,6 +202,10 @@ public class KafkaSource
     return new KafkaData<>(props, offsets);
   }
 
+  //exposed for testing purposes
+  FormatBundle getInputBundle() {
+    return inputBundle;
+  }
 
   /**
    * Basic {@link Deserializer} which simply wraps the payload as a {@link 
BytesWritable}.

http://git-wip-us.apache.org/repos/asf/crunch/blob/6cb3cb01/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaSourceIT.java
----------------------------------------------------------------------
diff --git 
a/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaSourceIT.java 
b/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaSourceIT.java
index 7f1323e..16aa767 100644
--- a/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaSourceIT.java
+++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaSourceIT.java
@@ -25,13 +25,16 @@ import org.apache.crunch.Pair;
 import org.apache.crunch.Pipeline;
 import org.apache.crunch.TableSource;
 import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.FormatBundle;
 import org.apache.crunch.io.From;
 import org.apache.crunch.io.To;
+import org.apache.crunch.kafka.inputformat.KafkaInputFormat;
 import org.apache.crunch.test.TemporaryPath;
 import org.apache.crunch.types.avro.Avros;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.TopicPartition;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -40,6 +43,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -83,6 +87,43 @@ public class KafkaSourceIT {
   }
 
   @Test
+  public void defaultEarliestOffsetReset() {
+    Map<TopicPartition, Pair<Long, Long>> offsets = Collections.emptyMap();
+
+    Configuration config = ClusterTest.getConf();
+
+    //Remove this so should revert to default.
+    consumerProps.remove(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
+
+    KafkaSource kafkaSource = new KafkaSource(consumerProps, offsets);
+
+    FormatBundle inputBundle = kafkaSource.getInputBundle();
+    Configuration cfg = new Configuration(false);
+    inputBundle.configure(cfg);
+    Properties kafkaConnectionProperties = 
KafkaUtils.getKafkaConnectionProperties(cfg);
+    kafkaConnectionProperties = 
KafkaInputFormat.filterConnectionProperties(kafkaConnectionProperties);
+    
assertThat(kafkaConnectionProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG),
 is("earliest"));
+  }
+
+  @Test
+  public void offsetResetOverridable() {
+    Map<TopicPartition, Pair<Long, Long>> offsets = Collections.emptyMap();
+
+    Configuration config = ClusterTest.getConf();
+
+    consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"latest");
+
+    KafkaSource kafkaSource = new KafkaSource(consumerProps, offsets);
+
+    FormatBundle inputBundle = kafkaSource.getInputBundle();
+    Configuration cfg = new Configuration(false);
+    inputBundle.configure(cfg);
+    Properties kafkaConnectionProperties = 
KafkaUtils.getKafkaConnectionProperties(cfg);
+    kafkaConnectionProperties = 
KafkaInputFormat.filterConnectionProperties(kafkaConnectionProperties);
+    
assertThat(kafkaConnectionProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG),
 is("latest"));
+  }
+
+  @Test
   public void sourceReadData() {
     List<String> keys = 
ClusterTest.writeData(ClusterTest.getProducerProperties(), topic, "batch", 10, 
10);
     Map<TopicPartition, Long> startOffsets = getBrokerOffsets(consumerProps, 
OffsetRequest.EarliestTime(), topic);

Reply via email to