This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 21c04cb2610 [HUDI-7009] Filtering out null values from avro kafka 
source (#9955)
21c04cb2610 is described below

commit 21c04cb26101a03e9d448170be7864ee8e5341ff
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Mon Nov 6 08:40:22 2023 -0500

    [HUDI-7009] Filtering out null values from avro kafka source (#9955)
---
 .../org/apache/hudi/utilities/sources/AvroKafkaSource.java |  4 ++--
 .../apache/hudi/utilities/sources/TestAvroKafkaSource.java | 14 +++++++++++++-
 2 files changed, 15 insertions(+), 3 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
index cfaae51ae27..e9353bb2666 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
@@ -103,14 +103,14 @@ public class AvroKafkaSource extends 
KafkaSource<GenericRecord> {
       //Don't want kafka offsets here so we use originalSchemaProvider
       AvroConvertor convertor = new 
AvroConvertor(originalSchemaProvider.getSourceSchema());
       kafkaRDD = KafkaUtils.<String, byte[]>createRDD(sparkContext, 
offsetGen.getKafkaParams(), offsetRanges,
-          LocationStrategies.PreferConsistent()).map(obj ->
+          LocationStrategies.PreferConsistent()).filter(obj -> obj.value() != 
null).map(obj ->
           new ConsumerRecord<>(obj.topic(), obj.partition(), obj.offset(),
               obj.key(), convertor.fromAvroBinary(obj.value())));
     } else {
       kafkaRDD = KafkaUtils.createRDD(sparkContext, 
offsetGen.getKafkaParams(), offsetRanges,
           LocationStrategies.PreferConsistent());
     }
-    return maybeAppendKafkaOffsets(kafkaRDD);
+    return maybeAppendKafkaOffsets(kafkaRDD.filter(consemerRec -> 
consemerRec.value() != null));
   }
 
   protected JavaRDD<GenericRecord> 
maybeAppendKafkaOffsets(JavaRDD<ConsumerRecord<Object, Object>> kafkaRDD) {
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java
index 16ec4545665..3daa9505538 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java
@@ -125,6 +125,16 @@ public class TestAvroKafkaSource extends 
SparkClientFunctionalTestHarness {
     }
   }
 
+  void sendMessagesToKafkaWithNullKafkaValue(String topic, int count, int 
numPartitions) {
+    Properties config = getProducerProperties();
+    try (Producer<String, byte[]> producer = new KafkaProducer<>(config)) {
+      for (int i = 0; i < count; i++) {
+        // null kafka value
+        producer.send(new ProducerRecord<>(topic, i % numPartitions, "key", 
null));
+      }
+    }
+  }
+
   private Properties getProducerProperties() {
     Properties props = new Properties();
     props.put("bootstrap.servers", testUtils.brokerAddress());
@@ -185,6 +195,9 @@ public class TestAvroKafkaSource extends 
SparkClientFunctionalTestHarness {
     int numMessages = 30;
     testUtils.createTopic(topic,numPartitions);
     sendMessagesToKafka(topic, numMessages, numPartitions);
+    // send some null value records
+    sendMessagesToKafkaWithNullKafkaValue(topic, numMessages, numPartitions);
+
     AvroKafkaSource avroKafkaSource = new AvroKafkaSource(props, jsc(), 
spark(), schemaProvider, metrics);
     SourceFormatAdapter kafkaSource = new SourceFormatAdapter(avroKafkaSource);
     Dataset<Row> c = 
kafkaSource.fetchNewDataInRowFormat(Option.empty(),Long.MAX_VALUE)
@@ -214,6 +227,5 @@ public class TestAvroKafkaSource extends 
SparkClientFunctionalTestHarness {
     Dataset<Row> nullKafkaKeyDataset = 
kafkaSourceWithNullKafkaKey.fetchNewDataInRowFormat(Option.empty(),Long.MAX_VALUE)
             .getBatch().get();
     assertEquals(numMessages, 
nullKafkaKeyDataset.toDF().filter("_hoodie_kafka_source_key is null").count());
-
   }
 }

Reply via email to