This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit c95ebfb47ab2bb07a22747b4a677a79253017ded
Author: Tim Brown <[email protected]>
AuthorDate: Thu Apr 25 16:43:34 2024 -0700

    [MINOR] Make KafkaSource abstraction public and more flexible (#11093)
---
 .../org/apache/hudi/utilities/sources/AvroKafkaSource.java  |  4 ++--
 .../org/apache/hudi/utilities/sources/JsonKafkaSource.java  |  4 ++--
 .../java/org/apache/hudi/utilities/sources/KafkaSource.java | 13 ++++++-------
 .../org/apache/hudi/utilities/sources/ProtoKafkaSource.java |  4 ++--
 .../apache/hudi/utilities/sources/BaseTestKafkaSource.java  |  8 ++++----
 .../apache/hudi/utilities/sources/TestJsonKafkaSource.java  |  6 +++---
 .../apache/hudi/utilities/sources/TestProtoKafkaSource.java |  8 +++-----
 7 files changed, 22 insertions(+), 25 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
index 36c83d63030..66d1cfe61c0 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
@@ -52,7 +52,7 @@ import static 
org.apache.hudi.utilities.config.KafkaSourceConfig.KAFKA_VALUE_DES
 /**
  * Reads avro serialized Kafka data, based on the confluent schema-registry.
  */
-public class AvroKafkaSource extends KafkaSource<GenericRecord> {
+public class AvroKafkaSource extends KafkaSource<JavaRDD<GenericRecord>> {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(AvroKafkaSource.class);
   // These are settings used to pass things to KafkaAvroDeserializer
@@ -106,7 +106,7 @@ public class AvroKafkaSource extends 
KafkaSource<GenericRecord> {
   }
 
   @Override
-  JavaRDD<GenericRecord> toRDD(OffsetRange[] offsetRanges) {
+  protected JavaRDD<GenericRecord> toBatch(OffsetRange[] offsetRanges) {
     JavaRDD<ConsumerRecord<Object, Object>> kafkaRDD;
     if (deserializerClassName.equals(ByteArrayDeserializer.class.getName())) {
       if (schemaProvider == null) {
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
index c8c3b3421c6..71f0c4db3f1 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
@@ -55,7 +55,7 @@ import static 
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SO
 /**
  * Read json kafka data.
  */
-public class JsonKafkaSource extends KafkaSource<String> {
+public class JsonKafkaSource extends KafkaSource<JavaRDD<String>> {
 
   public JsonKafkaSource(TypedProperties properties, JavaSparkContext 
sparkContext, SparkSession sparkSession,
                          SchemaProvider schemaProvider, HoodieIngestionMetrics 
metrics) {
@@ -71,7 +71,7 @@ public class JsonKafkaSource extends KafkaSource<String> {
   }
 
   @Override
-  JavaRDD<String> toRDD(OffsetRange[] offsetRanges) {
+  protected JavaRDD<String> toBatch(OffsetRange[] offsetRanges) {
     JavaRDD<ConsumerRecord<Object, Object>> kafkaRDD = 
KafkaUtils.createRDD(sparkContext,
             offsetGen.getKafkaParams(),
             offsetRanges,
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java
index 52a6a1217cc..3dc7fe69a0d 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java
@@ -29,7 +29,6 @@ import 
org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
 import org.apache.hudi.utilities.streamer.SourceProfile;
 import org.apache.hudi.utilities.streamer.StreamContext;
 
-import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.streaming.kafka010.OffsetRange;
@@ -38,7 +37,7 @@ import org.slf4j.LoggerFactory;
 
 import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
 
-abstract class KafkaSource<T> extends Source<JavaRDD<T>> {
+public abstract class KafkaSource<T> extends Source<T> {
   private static final Logger LOG = LoggerFactory.getLogger(KafkaSource.class);
   // these are native kafka's config. do not change the config names.
   protected static final String NATIVE_KAFKA_KEY_DESERIALIZER_PROP = 
"key.deserializer";
@@ -60,7 +59,7 @@ abstract class KafkaSource<T> extends Source<JavaRDD<T>> {
   }
 
   @Override
-  protected InputBatch<JavaRDD<T>> fetchNewData(Option<String> 
lastCheckpointStr, long sourceLimit) {
+  protected InputBatch<T> fetchNewData(Option<String> lastCheckpointStr, long 
sourceLimit) {
     try {
       OffsetRange[] offsetRanges;
       if (sourceProfileSupplier.isPresent() && 
sourceProfileSupplier.get().getSourceProfile() != null) {
@@ -78,7 +77,7 @@ abstract class KafkaSource<T> extends Source<JavaRDD<T>> {
     }
   }
 
-  private InputBatch<JavaRDD<T>> toInputBatch(OffsetRange[] offsetRanges) {
+  private InputBatch<T> toInputBatch(OffsetRange[] offsetRanges) {
     long totalNewMsgs = 
KafkaOffsetGen.CheckpointUtils.totalNewMessages(offsetRanges);
     LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + 
offsetGen.getTopicName());
     if (totalNewMsgs <= 0) {
@@ -86,11 +85,11 @@ abstract class KafkaSource<T> extends Source<JavaRDD<T>> {
       return new InputBatch<>(Option.empty(), 
KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges));
     }
     
metrics.updateStreamerSourceNewMessageCount(METRIC_NAME_KAFKA_MESSAGE_IN_COUNT, 
totalNewMsgs);
-    JavaRDD<T> newDataRDD = toRDD(offsetRanges);
-    return new InputBatch<>(Option.of(newDataRDD), 
KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges));
+    T newBatch = toBatch(offsetRanges);
+    return new InputBatch<>(Option.of(newBatch), 
KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges));
   }
 
-  abstract JavaRDD<T> toRDD(OffsetRange[] offsetRanges);
+  protected abstract T toBatch(OffsetRange[] offsetRanges);
 
   @Override
   public void onCommit(String lastCkptStr) {
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java
index d7a15b3932c..1dc731b5f95 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java
@@ -51,7 +51,7 @@ import static 
org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
 /**
  * Reads protobuf serialized Kafka data, based on a provided class name.
  */
-public class ProtoKafkaSource extends KafkaSource<Message> {
+public class ProtoKafkaSource extends KafkaSource<JavaRDD<Message>> {
 
   private final String className;
 
@@ -75,7 +75,7 @@ public class ProtoKafkaSource extends KafkaSource<Message> {
   }
 
   @Override
-  JavaRDD<Message> toRDD(OffsetRange[] offsetRanges) {
+  protected JavaRDD<Message> toBatch(OffsetRange[] offsetRanges) {
     ProtoDeserializer deserializer = new ProtoDeserializer(className);
     return KafkaUtils.<String, byte[]>createRDD(sparkContext, 
offsetGen.getKafkaParams(), offsetRanges,
         LocationStrategies.PreferConsistent()).map(obj -> 
deserializer.parse(obj.value()));
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java
index e45d10e7a61..34db1acdd93 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java
@@ -60,7 +60,7 @@ import static org.mockito.Mockito.when;
 /**
  * Generic tests for all {@link KafkaSource} to ensure all implementations 
properly handle offsets, fetch limits, failure modes, etc.
  */
-abstract class BaseTestKafkaSource extends SparkClientFunctionalTestHarness {
+public abstract class BaseTestKafkaSource extends 
SparkClientFunctionalTestHarness {
   protected static final String TEST_TOPIC_PREFIX = "hoodie_test_";
 
   protected final HoodieIngestionMetrics metrics = 
mock(HoodieIngestionMetrics.class);
@@ -80,11 +80,11 @@ abstract class BaseTestKafkaSource extends 
SparkClientFunctionalTestHarness {
     testUtils.teardown();
   }
 
-  abstract TypedProperties createPropsForKafkaSource(String topic, Long 
maxEventsToReadFromKafkaSource, String resetStrategy);
+  protected abstract TypedProperties createPropsForKafkaSource(String topic, 
Long maxEventsToReadFromKafkaSource, String resetStrategy);
 
-  abstract SourceFormatAdapter createSource(TypedProperties props);
+  protected abstract SourceFormatAdapter createSource(TypedProperties props);
 
-  abstract void sendMessagesToKafka(String topic, int count, int 
numPartitions);
+  protected abstract void sendMessagesToKafka(String topic, int count, int 
numPartitions);
 
   @Test
   public void testKafkaSource() {
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
index 5c269ab036a..92238721fcd 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
@@ -87,7 +87,7 @@ public class TestJsonKafkaSource extends BaseTestKafkaSource {
   }
 
   @Override
-  TypedProperties createPropsForKafkaSource(String topic, Long 
maxEventsToReadFromKafkaSource, String resetStrategy) {
+  protected TypedProperties createPropsForKafkaSource(String topic, Long 
maxEventsToReadFromKafkaSource, String resetStrategy) {
     return createPropsForJsonKafkaSource(testUtils.brokerAddress(), topic, 
maxEventsToReadFromKafkaSource, resetStrategy);
   }
 
@@ -105,7 +105,7 @@ public class TestJsonKafkaSource extends 
BaseTestKafkaSource {
   }
 
   @Override
-  SourceFormatAdapter createSource(TypedProperties props) {
+  protected SourceFormatAdapter createSource(TypedProperties props) {
     return new SourceFormatAdapter(new JsonKafkaSource(props, jsc(), spark(), 
metrics, new DefaultStreamContext(schemaProvider, sourceProfile)));
   }
 
@@ -204,7 +204,7 @@ public class TestJsonKafkaSource extends 
BaseTestKafkaSource {
   }
 
   @Override
-  void sendMessagesToKafka(String topic, int count, int numPartitions) {
+  protected void sendMessagesToKafka(String topic, int count, int 
numPartitions) {
     HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
     testUtils.sendMessages(topic, 
jsonifyRecordsByPartitions(dataGenerator.generateInsertsAsPerSchema("000", 
count, HoodieTestDataGenerator.SHORT_TRIP_SCHEMA), numPartitions));
   }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestProtoKafkaSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestProtoKafkaSource.java
index f9679211144..662cd1dd985 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestProtoKafkaSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestProtoKafkaSource.java
@@ -18,7 +18,6 @@
 
 package org.apache.hudi.utilities.sources;
 
-import org.apache.hudi.AvroConversionUtils;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.utilities.config.KafkaSourceConfig;
@@ -88,7 +87,7 @@ public class TestProtoKafkaSource extends BaseTestKafkaSource 
{
   }
 
   @Override
-  SourceFormatAdapter createSource(TypedProperties props) {
+  protected SourceFormatAdapter createSource(TypedProperties props) {
     this.schemaProvider = new ProtoClassBasedSchemaProvider(props, jsc());
     Source protoKafkaSource = new ProtoKafkaSource(props, jsc(), spark(), 
metrics, new DefaultStreamContext(schemaProvider, sourceProfile));
     return new SourceFormatAdapter(protoKafkaSource);
@@ -112,8 +111,7 @@ public class TestProtoKafkaSource extends 
BaseTestKafkaSource {
     InputBatch<JavaRDD<GenericRecord>> fetch1 = 
kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 900);
     assertEquals(900, fetch1.getBatch().get().count());
     // Test Avro To DataFrame<Row> path
-    Dataset<Row> fetch1AsRows = 
AvroConversionUtils.createDataFrame(JavaRDD.toRDD(fetch1.getBatch().get()),
-        schemaProvider.getSourceSchema().toString(), 
protoKafkaSource.getSparkSession());
+    Dataset<Row> fetch1AsRows = 
kafkaSource.fetchNewDataInRowFormat(Option.empty(), 900).getBatch().get();
     assertEquals(900, fetch1AsRows.count());
 
     // 2. Produce new data, extract new data
@@ -196,7 +194,7 @@ public class TestProtoKafkaSource extends 
BaseTestKafkaSource {
   }
 
   @Override
-  void sendMessagesToKafka(String topic, int count, int numPartitions) {
+  protected void sendMessagesToKafka(String topic, int count, int 
numPartitions) {
     List<Sample> messages = createSampleMessages(count);
     try (Producer<String, byte[]> producer = new 
KafkaProducer<>(getProducerProperties())) {
       for (int i = 0; i < messages.size(); i++) {

Reply via email to