This is an automated email from the ASF dual-hosted git repository.

leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a4cfb4  [HUDI-340]: made max events to read from kafka source 
configurable (#1039)
2a4cfb4 is described below

commit 2a4cfb47c76a0c8800d4998453ec356711807c83
Author: Pratyaksh Sharma <pratyaks...@gmail.com>
AuthorDate: Tue Nov 26 16:04:02 2019 +0530

    [HUDI-340]: made max events to read from kafka source configurable (#1039)
---
 .../utilities/sources/helpers/KafkaOffsetGen.java  | 13 ++-
 .../hudi/utilities/sources/TestKafkaSource.java    | 93 ++++++++++++++++++++--
 2 files changed, 96 insertions(+), 10 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
index 4211af6..873e793 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
@@ -50,8 +50,6 @@ public class KafkaOffsetGen {
 
   private static volatile Logger log = 
LogManager.getLogger(KafkaOffsetGen.class);
 
-  private static long DEFAULT_MAX_EVENTS_TO_READ = 1000000; // 1M events max
-
   public static class CheckpointUtils {
 
     /**
@@ -170,10 +168,13 @@ public class KafkaOffsetGen {
   /**
    * Configs to be passed for this source. All standard Kafka consumer configs 
are also respected
    */
-  static class Config {
+  public static class Config {
 
     private static final String KAFKA_TOPIC_NAME = 
"hoodie.deltastreamer.source.kafka.topic";
+    private static final String MAX_EVENTS_FROM_KAFKA_SOURCE_PROP = 
"hoodie.deltastreamer.kafka.source.maxEvents";
     private static final KafkaResetOffsetStrategies DEFAULT_AUTO_RESET_OFFSET 
= KafkaResetOffsetStrategies.LARGEST;
+    public static final long defaultMaxEventsFromKafkaSource = 5000000;
+    public static long DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE = 
defaultMaxEventsFromKafkaSource;
   }
 
   private final HashMap<String, String> kafkaParams;
@@ -229,7 +230,11 @@ public class KafkaOffsetGen {
         new 
HashMap(ScalaHelpers.toJavaMap(cluster.getLatestLeaderOffsets(topicPartitions).right().get()));
 
     // Come up with final set of OffsetRanges to read (account for new 
partitions, limit number of events)
-    long numEvents = Math.min(DEFAULT_MAX_EVENTS_TO_READ, sourceLimit);
+    long maxEventsToReadFromKafka = 
props.getLong(Config.MAX_EVENTS_FROM_KAFKA_SOURCE_PROP,
+        Config.DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE);
+    maxEventsToReadFromKafka = (maxEventsToReadFromKafka == Long.MAX_VALUE || 
maxEventsToReadFromKafka == Integer.MAX_VALUE)
+        ? Config.DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE : 
maxEventsToReadFromKafka;
+    long numEvents = sourceLimit == Long.MAX_VALUE ? maxEventsToReadFromKafka 
: sourceLimit;
     OffsetRange[] offsetRanges = 
CheckpointUtils.computeOffsetRanges(fromOffsets, toOffsets, numEvents);
 
     return offsetRanges;
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java
index 241fae0..c1ca1f0 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java
@@ -32,6 +32,7 @@ import org.apache.hudi.utilities.UtilitiesTestBase;
 import org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter;
 import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
 import 
org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils;
+import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
@@ -78,18 +79,26 @@ public class TestKafkaSource extends UtilitiesTestBase {
     testUtils.teardown();
   }
 
-  @Test
-  public void testJsonKafkaSource() throws IOException {
-
-    // topic setup.
-    testUtils.createTopic(TEST_TOPIC_NAME, 2);
-    HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
+  private TypedProperties createPropsForJsonSource(Long 
maxEventsToReadFromKafkaSource) {
     TypedProperties props = new TypedProperties();
     props.setProperty("hoodie.deltastreamer.source.kafka.topic", 
TEST_TOPIC_NAME);
     props.setProperty("metadata.broker.list", testUtils.brokerAddress());
     props.setProperty("auto.offset.reset", "smallest");
     props.setProperty("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
     props.setProperty("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
+    props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents",
+        maxEventsToReadFromKafkaSource != null ? 
String.valueOf(maxEventsToReadFromKafkaSource) :
+            String.valueOf(Config.DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE));
+    return props;
+  }
+
+  @Test
+  public void testJsonKafkaSource() throws IOException {
+
+    // topic setup.
+    testUtils.createTopic(TEST_TOPIC_NAME, 2);
+    HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
+    TypedProperties props = createPropsForJsonSource(null);
 
     Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, 
schemaProvider);
     SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
@@ -131,6 +140,78 @@ public class TestKafkaSource extends UtilitiesTestBase {
     assertEquals(Option.empty(), fetch4AsRows.getBatch());
   }
 
+  @Test
+  public void testJsonKafkaSourceWithDefaultUpperCap() throws IOException {
+    // topic setup.
+    testUtils.createTopic(TEST_TOPIC_NAME, 2);
+    HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
+    TypedProperties props = createPropsForJsonSource(Long.MAX_VALUE);
+
+    Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, 
schemaProvider);
+    SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
+    Config.DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE = 500;
+
+    /*
+    1. Extract without any checkpoint => get all the data, respecting default 
upper cap since both sourceLimit and
+    maxEventsFromKafkaSourceProp are set to Long.MAX_VALUE
+     */
+    testUtils.sendMessages(TEST_TOPIC_NAME, 
Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
+    InputBatch<JavaRDD<GenericRecord>> fetch1 = 
kafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
+    assertEquals(500, fetch1.getBatch().get().count());
+
+    // 2. Produce new data, extract new data based on sourceLimit
+    testUtils.sendMessages(TEST_TOPIC_NAME, 
Helpers.jsonifyRecords(dataGenerator.generateInserts("001", 1000)));
+    InputBatch<Dataset<Row>> fetch2 =
+        
kafkaSource.fetchNewDataInRowFormat(Option.of(fetch1.getCheckpointForNextBatch()),
 1500);
+    assertEquals(1500, fetch2.getBatch().get().count());
+
+    //reset the value back since it is a static variable
+    Config.DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE = 
Config.defaultMaxEventsFromKafkaSource;
+  }
+
+  @Test
+  public void testJsonKafkaSourceWithConfigurableUpperCap() throws IOException 
{
+    // topic setup.
+    testUtils.createTopic(TEST_TOPIC_NAME, 2);
+    HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
+    TypedProperties props = createPropsForJsonSource(500L);
+
+    Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, 
schemaProvider);
+    SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
+
+    // 1. Extract without any checkpoint => get all the data, respecting 
sourceLimit
+    testUtils.sendMessages(TEST_TOPIC_NAME, 
Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
+    InputBatch<JavaRDD<GenericRecord>> fetch1 = 
kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 900);
+    assertEquals(900, fetch1.getBatch().get().count());
+
+    // 2. Produce new data, extract new data based on upper cap
+    testUtils.sendMessages(TEST_TOPIC_NAME, 
Helpers.jsonifyRecords(dataGenerator.generateInserts("001", 1000)));
+    InputBatch<Dataset<Row>> fetch2 =
+        
kafkaSource.fetchNewDataInRowFormat(Option.of(fetch1.getCheckpointForNextBatch()),
 Long.MAX_VALUE);
+    assertEquals(500, fetch2.getBatch().get().count());
+
+    //fetch data respecting source limit where upper cap > sourceLimit
+    InputBatch<JavaRDD<GenericRecord>> fetch3 =
+        
kafkaSource.fetchNewDataInAvroFormat(Option.of(fetch1.getCheckpointForNextBatch()),
 400);
+    assertEquals(400, fetch3.getBatch().get().count());
+
+    //fetch data respecting source limit where upper cap < sourceLimit
+    InputBatch<JavaRDD<GenericRecord>> fetch4 =
+        
kafkaSource.fetchNewDataInAvroFormat(Option.of(fetch2.getCheckpointForNextBatch()),
 600);
+    assertEquals(600, fetch4.getBatch().get().count());
+
+    // 3. Extract with previous checkpoint => gives same data back (idempotent)
+    InputBatch<JavaRDD<GenericRecord>> fetch5 =
+        
kafkaSource.fetchNewDataInAvroFormat(Option.of(fetch1.getCheckpointForNextBatch()),
 Long.MAX_VALUE);
+    assertEquals(fetch2.getBatch().get().count(), 
fetch5.getBatch().get().count());
+    assertEquals(fetch2.getCheckpointForNextBatch(), 
fetch5.getCheckpointForNextBatch());
+
+    // 4. Extract with latest checkpoint => no new data returned
+    InputBatch<JavaRDD<GenericRecord>> fetch6 =
+        
kafkaSource.fetchNewDataInAvroFormat(Option.of(fetch4.getCheckpointForNextBatch()),
 Long.MAX_VALUE);
+    assertEquals(Option.empty(), fetch6.getBatch());
+  }
+
   private static HashMap<TopicAndPartition, LeaderOffset> makeOffsetMap(int[] 
partitions, long[] offsets) {
     HashMap<TopicAndPartition, LeaderOffset> map = new HashMap<>();
     for (int i = 0; i < partitions.length; i++) {

Reply via email to