This is an automated email from the ASF dual-hosted git repository.
vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 9067657 [HUDI-2487] Fix JsonKafkaSource cannot filter empty messages
from kafka (#3715)
9067657 is described below
commit 9067657a5ff313990c819065ad12d71fa8bb0f06
Author: qianchutao <[email protected]>
AuthorDate: Tue Sep 28 13:47:15 2021 +0800
[HUDI-2487] Fix JsonKafkaSource cannot filter empty messages from kafka
(#3715)
---
.../hudi/utilities/sources/JsonKafkaSource.java | 6 +++++-
.../utilities/sources/TestJsonKafkaSource.java | 22 ++++++++++++++++++++++
2 files changed, 27 insertions(+), 1 deletion(-)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
index cf9e905..39340d0 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
@@ -69,7 +69,11 @@ public class JsonKafkaSource extends JsonSource {
private JavaRDD<String> toRDD(OffsetRange[] offsetRanges) {
return KafkaUtils.createRDD(sparkContext, offsetGen.getKafkaParams(),
offsetRanges,
- LocationStrategies.PreferConsistent()).map(x -> (String)
x.value());
+ LocationStrategies.PreferConsistent()).filter(x -> {
+ String msgValue = (String) x.value();
+ //Filter null messages from Kafka to prevent Exceptions
+ return msgValue != null;
+ }).map(x -> (String) x.value());
}
@Override
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
index da11035..2ed4c42 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
@@ -151,6 +151,28 @@ public class TestJsonKafkaSource extends UtilitiesTestBase
{
assertEquals(Option.empty(), fetch4AsRows.getBatch());
}
+ // test whether empty messages can be filtered
+ @Test
+ public void testJsonKafkaSourceFilterNullMsg() {
+ // topic setup.
+ testUtils.createTopic(TEST_TOPIC_NAME, 2);
+ HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
+ TypedProperties props = createPropsForJsonSource(null, "earliest");
+
+ Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession,
schemaProvider, metrics);
+ SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
+
+ // 1. Extract without any checkpoint => get all the data, respecting
sourceLimit
+ assertEquals(Option.empty(),
kafkaSource.fetchNewDataInAvroFormat(Option.empty(),
Long.MAX_VALUE).getBatch());
+ // Send 1000 non-null messages to Kafka
+ testUtils.sendMessages(TEST_TOPIC_NAME,
Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
+ // Send 100 null messages to Kafka
+ testUtils.sendMessages(TEST_TOPIC_NAME,new String[100]);
+ InputBatch<JavaRDD<GenericRecord>> fetch1 =
kafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
+ // Verify that messages with null values are filtered
+ assertEquals(1000, fetch1.getBatch().get().count());
+ }
+
// test case with kafka offset reset strategy
@Test
public void testJsonKafkaSourceResetStrategy() {