This is an automated email from the ASF dual-hosted git repository. leesf pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push: new 2a4cfb4 [HUDI-340]: made max events to read from kafka source configurable (#1039) 2a4cfb4 is described below commit 2a4cfb47c76a0c8800d4998453ec356711807c83 Author: Pratyaksh Sharma <pratyaks...@gmail.com> AuthorDate: Tue Nov 26 16:04:02 2019 +0530 [HUDI-340]: made max events to read from kafka source configurable (#1039) --- .../utilities/sources/helpers/KafkaOffsetGen.java | 13 ++- .../hudi/utilities/sources/TestKafkaSource.java | 93 ++++++++++++++++++++-- 2 files changed, 96 insertions(+), 10 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java index 4211af6..873e793 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java @@ -50,8 +50,6 @@ public class KafkaOffsetGen { private static volatile Logger log = LogManager.getLogger(KafkaOffsetGen.class); - private static long DEFAULT_MAX_EVENTS_TO_READ = 1000000; // 1M events max - public static class CheckpointUtils { /** @@ -170,10 +168,13 @@ public class KafkaOffsetGen { /** * Configs to be passed for this source. All standard Kafka consumer configs are also respected */ - static class Config { + public static class Config { private static final String KAFKA_TOPIC_NAME = "hoodie.deltastreamer.source.kafka.topic"; + private static final String MAX_EVENTS_FROM_KAFKA_SOURCE_PROP = "hoodie.deltastreamer.kafka.source.maxEvents"; private static final KafkaResetOffsetStrategies DEFAULT_AUTO_RESET_OFFSET = KafkaResetOffsetStrategies.LARGEST; + public static final long defaultMaxEventsFromKafkaSource = 5000000; + public static long DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE = defaultMaxEventsFromKafkaSource; } private final HashMap<String, String> kafkaParams; @@ -229,7 +230,11 @@ public class KafkaOffsetGen { new HashMap(ScalaHelpers.toJavaMap(cluster.getLatestLeaderOffsets(topicPartitions).right().get())); // Come up with final set of OffsetRanges to read (account for new partitions, limit number of events) - long numEvents = Math.min(DEFAULT_MAX_EVENTS_TO_READ, sourceLimit); + long maxEventsToReadFromKafka = props.getLong(Config.MAX_EVENTS_FROM_KAFKA_SOURCE_PROP, + Config.DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE); + maxEventsToReadFromKafka = (maxEventsToReadFromKafka == Long.MAX_VALUE || maxEventsToReadFromKafka == Integer.MAX_VALUE) + ? Config.DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE : maxEventsToReadFromKafka; + long numEvents = sourceLimit == Long.MAX_VALUE ? maxEventsToReadFromKafka : sourceLimit; OffsetRange[] offsetRanges = CheckpointUtils.computeOffsetRanges(fromOffsets, toOffsets, numEvents); return offsetRanges; diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java index 241fae0..c1ca1f0 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java @@ -32,6 +32,7 @@ import org.apache.hudi.utilities.UtilitiesTestBase; import org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter; import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils; +import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; @@ -78,18 +79,26 @@ public class TestKafkaSource extends UtilitiesTestBase { testUtils.teardown(); } - @Test - public void testJsonKafkaSource() throws IOException { - - // topic setup. - testUtils.createTopic(TEST_TOPIC_NAME, 2); - HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); + private TypedProperties createPropsForJsonSource(Long maxEventsToReadFromKafkaSource) { TypedProperties props = new TypedProperties(); props.setProperty("hoodie.deltastreamer.source.kafka.topic", TEST_TOPIC_NAME); props.setProperty("metadata.broker.list", testUtils.brokerAddress()); props.setProperty("auto.offset.reset", "smallest"); props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", + maxEventsToReadFromKafkaSource != null ? String.valueOf(maxEventsToReadFromKafkaSource) : + String.valueOf(Config.DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE)); + return props; + } + + @Test + public void testJsonKafkaSource() throws IOException { + + // topic setup. + testUtils.createTopic(TEST_TOPIC_NAME, 2); + HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); + TypedProperties props = createPropsForJsonSource(null); Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider); SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource); @@ -131,6 +140,78 @@ public class TestKafkaSource extends UtilitiesTestBase { assertEquals(Option.empty(), fetch4AsRows.getBatch()); } + @Test + public void testJsonKafkaSourceWithDefaultUpperCap() throws IOException { + // topic setup. + testUtils.createTopic(TEST_TOPIC_NAME, 2); + HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); + TypedProperties props = createPropsForJsonSource(Long.MAX_VALUE); + + Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider); + SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource); + Config.DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE = 500; + + /* + 1. Extract without any checkpoint => get all the data, respecting default upper cap since both sourceLimit and + maxEventsFromKafkaSourceProp are set to Long.MAX_VALUE + */ + testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000))); + InputBatch<JavaRDD<GenericRecord>> fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE); + assertEquals(500, fetch1.getBatch().get().count()); + + // 2. Produce new data, extract new data based on sourceLimit + testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("001", 1000))); + InputBatch<Dataset<Row>> fetch2 = + kafkaSource.fetchNewDataInRowFormat(Option.of(fetch1.getCheckpointForNextBatch()), 1500); + assertEquals(1500, fetch2.getBatch().get().count()); + + //reset the value back since it is a static variable + Config.DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE = Config.defaultMaxEventsFromKafkaSource; + } + + @Test + public void testJsonKafkaSourceWithConfigurableUpperCap() throws IOException { + // topic setup. + testUtils.createTopic(TEST_TOPIC_NAME, 2); + HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); + TypedProperties props = createPropsForJsonSource(500L); + + Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider); + SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource); + + // 1. Extract without any checkpoint => get all the data, respecting sourceLimit + testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000))); + InputBatch<JavaRDD<GenericRecord>> fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 900); + assertEquals(900, fetch1.getBatch().get().count()); + + // 2. Produce new data, extract new data based on upper cap + testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("001", 1000))); + InputBatch<Dataset<Row>> fetch2 = + kafkaSource.fetchNewDataInRowFormat(Option.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE); + assertEquals(500, fetch2.getBatch().get().count()); + + //fetch data respecting source limit where upper cap > sourceLimit + InputBatch<JavaRDD<GenericRecord>> fetch3 = + kafkaSource.fetchNewDataInAvroFormat(Option.of(fetch1.getCheckpointForNextBatch()), 400); + assertEquals(400, fetch3.getBatch().get().count()); + + //fetch data respecting source limit where upper cap < sourceLimit + InputBatch<JavaRDD<GenericRecord>> fetch4 = + kafkaSource.fetchNewDataInAvroFormat(Option.of(fetch2.getCheckpointForNextBatch()), 600); + assertEquals(600, fetch4.getBatch().get().count()); + + // 3. Extract with previous checkpoint => gives same data back (idempotent) + InputBatch<JavaRDD<GenericRecord>> fetch5 = + kafkaSource.fetchNewDataInAvroFormat(Option.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE); + assertEquals(fetch2.getBatch().get().count(), fetch5.getBatch().get().count()); + assertEquals(fetch2.getCheckpointForNextBatch(), fetch5.getCheckpointForNextBatch()); + + // 4. Extract with latest checkpoint => no new data returned + InputBatch<JavaRDD<GenericRecord>> fetch6 = + kafkaSource.fetchNewDataInAvroFormat(Option.of(fetch4.getCheckpointForNextBatch()), Long.MAX_VALUE); + assertEquals(Option.empty(), fetch6.getBatch()); + } + private static HashMap<TopicAndPartition, LeaderOffset> makeOffsetMap(int[] partitions, long[] offsets) { HashMap<TopicAndPartition, LeaderOffset> map = new HashMap<>(); for (int i = 0; i < partitions.length; i++) {