This is an automated email from the ASF dual-hosted git repository.

vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 9067657  [HUDI-2487] Fix JsonKafkaSource cannot filter empty messages 
from kafka (#3715)
9067657 is described below

commit 9067657a5ff313990c819065ad12d71fa8bb0f06
Author: qianchutao <[email protected]>
AuthorDate: Tue Sep 28 13:47:15 2021 +0800

    [HUDI-2487] Fix JsonKafkaSource cannot filter empty messages from kafka 
(#3715)
---
 .../hudi/utilities/sources/JsonKafkaSource.java    |  6 +++++-
 .../utilities/sources/TestJsonKafkaSource.java     | 22 ++++++++++++++++++++++
 2 files changed, 27 insertions(+), 1 deletion(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
index cf9e905..39340d0 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
@@ -69,7 +69,11 @@ public class JsonKafkaSource extends JsonSource {
 
   private JavaRDD<String> toRDD(OffsetRange[] offsetRanges) {
     return KafkaUtils.createRDD(sparkContext, offsetGen.getKafkaParams(), 
offsetRanges,
-            LocationStrategies.PreferConsistent()).map(x -> (String) 
x.value());
+            LocationStrategies.PreferConsistent()).filter(x -> {
+              String msgValue = (String) x.value();
+              //Filter null messages from Kafka to prevent Exceptions
+              return msgValue != null;
+            }).map(x -> (String) x.value());
   }
 
   @Override
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
index da11035..2ed4c42 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
@@ -151,6 +151,28 @@ public class TestJsonKafkaSource extends UtilitiesTestBase 
{
     assertEquals(Option.empty(), fetch4AsRows.getBatch());
   }
 
+  // test whether empty messages can be filtered
+  @Test
+  public void testJsonKafkaSourceFilterNullMsg() {
+    // topic setup.
+    testUtils.createTopic(TEST_TOPIC_NAME, 2);
+    HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
+    TypedProperties props = createPropsForJsonSource(null, "earliest");
+
+    Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, 
schemaProvider, metrics);
+    SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
+
+    // 1. Extract without any checkpoint => get all the data, respecting 
sourceLimit
+    assertEquals(Option.empty(), 
kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 
Long.MAX_VALUE).getBatch());
+    // Send  1000 non-null messages to Kafka
+    testUtils.sendMessages(TEST_TOPIC_NAME, 
Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
+    // Send  100 null messages to Kafka
+    testUtils.sendMessages(TEST_TOPIC_NAME,new String[100]);
+    InputBatch<JavaRDD<GenericRecord>> fetch1 = 
kafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
+    // Verify that messages with null values are filtered
+    assertEquals(1000, fetch1.getBatch().get().count());
+  }
+
   // test case with kafka offset reset strategy
   @Test
   public void testJsonKafkaSourceResetStrategy() {

Reply via email to