This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch branch-0.x in repository https://gitbox.apache.org/repos/asf/hudi.git
commit c95ebfb47ab2bb07a22747b4a677a79253017ded Author: Tim Brown <[email protected]> AuthorDate: Thu Apr 25 16:43:34 2024 -0700 [MINOR] Make KafkaSource abstraction public and more flexible (#11093) --- .../org/apache/hudi/utilities/sources/AvroKafkaSource.java | 4 ++-- .../org/apache/hudi/utilities/sources/JsonKafkaSource.java | 4 ++-- .../java/org/apache/hudi/utilities/sources/KafkaSource.java | 13 ++++++------- .../org/apache/hudi/utilities/sources/ProtoKafkaSource.java | 4 ++-- .../apache/hudi/utilities/sources/BaseTestKafkaSource.java | 8 ++++---- .../apache/hudi/utilities/sources/TestJsonKafkaSource.java | 6 +++--- .../apache/hudi/utilities/sources/TestProtoKafkaSource.java | 8 +++----- 7 files changed, 22 insertions(+), 25 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java index 36c83d63030..66d1cfe61c0 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java @@ -52,7 +52,7 @@ import static org.apache.hudi.utilities.config.KafkaSourceConfig.KAFKA_VALUE_DES /** * Reads avro serialized Kafka data, based on the confluent schema-registry. */ -public class AvroKafkaSource extends KafkaSource<GenericRecord> { +public class AvroKafkaSource extends KafkaSource<JavaRDD<GenericRecord>> { private static final Logger LOG = LoggerFactory.getLogger(AvroKafkaSource.class); // These are settings used to pass things to KafkaAvroDeserializer @@ -106,7 +106,7 @@ public class AvroKafkaSource extends KafkaSource<GenericRecord> { } @Override - JavaRDD<GenericRecord> toRDD(OffsetRange[] offsetRanges) { + protected JavaRDD<GenericRecord> toBatch(OffsetRange[] offsetRanges) { JavaRDD<ConsumerRecord<Object, Object>> kafkaRDD; if (deserializerClassName.equals(ByteArrayDeserializer.class.getName())) { if (schemaProvider == null) { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java index c8c3b3421c6..71f0c4db3f1 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java @@ -55,7 +55,7 @@ import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SO /** * Read json kafka data. */ -public class JsonKafkaSource extends KafkaSource<String> { +public class JsonKafkaSource extends KafkaSource<JavaRDD<String>> { public JsonKafkaSource(TypedProperties properties, JavaSparkContext sparkContext, SparkSession sparkSession, SchemaProvider schemaProvider, HoodieIngestionMetrics metrics) { @@ -71,7 +71,7 @@ public class JsonKafkaSource extends KafkaSource<String> { } @Override - JavaRDD<String> toRDD(OffsetRange[] offsetRanges) { + protected JavaRDD<String> toBatch(OffsetRange[] offsetRanges) { JavaRDD<ConsumerRecord<Object, Object>> kafkaRDD = KafkaUtils.createRDD(sparkContext, offsetGen.getKafkaParams(), offsetRanges, diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java index 52a6a1217cc..3dc7fe69a0d 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java @@ -29,7 +29,6 @@ import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen; import org.apache.hudi.utilities.streamer.SourceProfile; import org.apache.hudi.utilities.streamer.StreamContext; -import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SparkSession; import org.apache.spark.streaming.kafka010.OffsetRange; @@ -38,7 +37,7 @@ import org.slf4j.LoggerFactory; import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys; -abstract class KafkaSource<T> extends Source<JavaRDD<T>> { +public abstract class KafkaSource<T> extends Source<T> { private static final Logger LOG = LoggerFactory.getLogger(KafkaSource.class); // these are native kafka's config. do not change the config names. protected static final String NATIVE_KAFKA_KEY_DESERIALIZER_PROP = "key.deserializer"; @@ -60,7 +59,7 @@ abstract class KafkaSource<T> extends Source<JavaRDD<T>> { } @Override - protected InputBatch<JavaRDD<T>> fetchNewData(Option<String> lastCheckpointStr, long sourceLimit) { + protected InputBatch<T> fetchNewData(Option<String> lastCheckpointStr, long sourceLimit) { try { OffsetRange[] offsetRanges; if (sourceProfileSupplier.isPresent() && sourceProfileSupplier.get().getSourceProfile() != null) { @@ -78,7 +77,7 @@ abstract class KafkaSource<T> extends Source<JavaRDD<T>> { } } - private InputBatch<JavaRDD<T>> toInputBatch(OffsetRange[] offsetRanges) { + private InputBatch<T> toInputBatch(OffsetRange[] offsetRanges) { long totalNewMsgs = KafkaOffsetGen.CheckpointUtils.totalNewMessages(offsetRanges); LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + offsetGen.getTopicName()); if (totalNewMsgs <= 0) { @@ -86,11 +85,11 @@ abstract class KafkaSource<T> extends Source<JavaRDD<T>> { return new InputBatch<>(Option.empty(), KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges)); } metrics.updateStreamerSourceNewMessageCount(METRIC_NAME_KAFKA_MESSAGE_IN_COUNT, totalNewMsgs); - JavaRDD<T> newDataRDD = toRDD(offsetRanges); - return new InputBatch<>(Option.of(newDataRDD), KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges)); + T newBatch = toBatch(offsetRanges); + return new InputBatch<>(Option.of(newBatch), KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges)); } - abstract JavaRDD<T> toRDD(OffsetRange[] offsetRanges); + protected abstract T toBatch(OffsetRange[] offsetRanges); @Override public void onCommit(String lastCkptStr) { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java index d7a15b3932c..1dc731b5f95 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java @@ -51,7 +51,7 @@ import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys; /** * Reads protobuf serialized Kafka data, based on a provided class name. */ -public class ProtoKafkaSource extends KafkaSource<Message> { +public class ProtoKafkaSource extends KafkaSource<JavaRDD<Message>> { private final String className; @@ -75,7 +75,7 @@ public class ProtoKafkaSource extends KafkaSource<Message> { } @Override - JavaRDD<Message> toRDD(OffsetRange[] offsetRanges) { + protected JavaRDD<Message> toBatch(OffsetRange[] offsetRanges) { ProtoDeserializer deserializer = new ProtoDeserializer(className); return KafkaUtils.<String, byte[]>createRDD(sparkContext, offsetGen.getKafkaParams(), offsetRanges, LocationStrategies.PreferConsistent()).map(obj -> deserializer.parse(obj.value())); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java index e45d10e7a61..34db1acdd93 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java @@ -60,7 +60,7 @@ import static org.mockito.Mockito.when; /** * Generic tests for all {@link KafkaSource} to ensure all implementations properly handle offsets, fetch limits, failure modes, etc. */ -abstract class BaseTestKafkaSource extends SparkClientFunctionalTestHarness { +public abstract class BaseTestKafkaSource extends SparkClientFunctionalTestHarness { protected static final String TEST_TOPIC_PREFIX = "hoodie_test_"; protected final HoodieIngestionMetrics metrics = mock(HoodieIngestionMetrics.class); @@ -80,11 +80,11 @@ abstract class BaseTestKafkaSource extends SparkClientFunctionalTestHarness { testUtils.teardown(); } - abstract TypedProperties createPropsForKafkaSource(String topic, Long maxEventsToReadFromKafkaSource, String resetStrategy); + protected abstract TypedProperties createPropsForKafkaSource(String topic, Long maxEventsToReadFromKafkaSource, String resetStrategy); - abstract SourceFormatAdapter createSource(TypedProperties props); + protected abstract SourceFormatAdapter createSource(TypedProperties props); - abstract void sendMessagesToKafka(String topic, int count, int numPartitions); + protected abstract void sendMessagesToKafka(String topic, int count, int numPartitions); @Test public void testKafkaSource() { diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java index 5c269ab036a..92238721fcd 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java @@ -87,7 +87,7 @@ public class TestJsonKafkaSource extends BaseTestKafkaSource { } @Override - TypedProperties createPropsForKafkaSource(String topic, Long maxEventsToReadFromKafkaSource, String resetStrategy) { + protected TypedProperties createPropsForKafkaSource(String topic, Long maxEventsToReadFromKafkaSource, String resetStrategy) { return createPropsForJsonKafkaSource(testUtils.brokerAddress(), topic, maxEventsToReadFromKafkaSource, resetStrategy); } @@ -105,7 +105,7 @@ public class TestJsonKafkaSource extends BaseTestKafkaSource { } @Override - SourceFormatAdapter createSource(TypedProperties props) { + protected SourceFormatAdapter createSource(TypedProperties props) { return new SourceFormatAdapter(new JsonKafkaSource(props, jsc(), spark(), metrics, new DefaultStreamContext(schemaProvider, sourceProfile))); } @@ -204,7 +204,7 @@ public class TestJsonKafkaSource extends BaseTestKafkaSource { } @Override - void sendMessagesToKafka(String topic, int count, int numPartitions) { + protected void sendMessagesToKafka(String topic, int count, int numPartitions) { HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); testUtils.sendMessages(topic, jsonifyRecordsByPartitions(dataGenerator.generateInsertsAsPerSchema("000", count, HoodieTestDataGenerator.SHORT_TRIP_SCHEMA), numPartitions)); } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestProtoKafkaSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestProtoKafkaSource.java index f9679211144..662cd1dd985 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestProtoKafkaSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestProtoKafkaSource.java @@ -18,7 +18,6 @@ package org.apache.hudi.utilities.sources; -import org.apache.hudi.AvroConversionUtils; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.util.Option; import org.apache.hudi.utilities.config.KafkaSourceConfig; @@ -88,7 +87,7 @@ public class TestProtoKafkaSource extends BaseTestKafkaSource { } @Override - SourceFormatAdapter createSource(TypedProperties props) { + protected SourceFormatAdapter createSource(TypedProperties props) { this.schemaProvider = new ProtoClassBasedSchemaProvider(props, jsc()); Source protoKafkaSource = new ProtoKafkaSource(props, jsc(), spark(), metrics, new DefaultStreamContext(schemaProvider, sourceProfile)); return new SourceFormatAdapter(protoKafkaSource); @@ -112,8 +111,7 @@ public class TestProtoKafkaSource extends BaseTestKafkaSource { InputBatch<JavaRDD<GenericRecord>> fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 900); assertEquals(900, fetch1.getBatch().get().count()); // Test Avro To DataFrame<Row> path - Dataset<Row> fetch1AsRows = AvroConversionUtils.createDataFrame(JavaRDD.toRDD(fetch1.getBatch().get()), - schemaProvider.getSourceSchema().toString(), protoKafkaSource.getSparkSession()); + Dataset<Row> fetch1AsRows = kafkaSource.fetchNewDataInRowFormat(Option.empty(), 900).getBatch().get(); assertEquals(900, fetch1AsRows.count()); // 2. Produce new data, extract new data @@ -196,7 +194,7 @@ public class TestProtoKafkaSource extends BaseTestKafkaSource { } @Override - void sendMessagesToKafka(String topic, int count, int numPartitions) { + protected void sendMessagesToKafka(String topic, int count, int numPartitions) { List<Sample> messages = createSampleMessages(count); try (Producer<String, byte[]> producer = new KafkaProducer<>(getProducerProperties())) { for (int i = 0; i < messages.size(); i++) {
