This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 21c04cb2610 [HUDI-7009] Filtering out null values from avro kafka
source (#9955)
21c04cb2610 is described below
commit 21c04cb26101a03e9d448170be7864ee8e5341ff
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Mon Nov 6 08:40:22 2023 -0500
[HUDI-7009] Filtering out null values from avro kafka source (#9955)
---
.../org/apache/hudi/utilities/sources/AvroKafkaSource.java | 4 ++--
.../apache/hudi/utilities/sources/TestAvroKafkaSource.java | 14 +++++++++++++-
2 files changed, 15 insertions(+), 3 deletions(-)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
index cfaae51ae27..e9353bb2666 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
@@ -103,14 +103,14 @@ public class AvroKafkaSource extends
KafkaSource<GenericRecord> {
//Don't want kafka offsets here so we use originalSchemaProvider
AvroConvertor convertor = new
AvroConvertor(originalSchemaProvider.getSourceSchema());
kafkaRDD = KafkaUtils.<String, byte[]>createRDD(sparkContext,
offsetGen.getKafkaParams(), offsetRanges,
- LocationStrategies.PreferConsistent()).map(obj ->
+ LocationStrategies.PreferConsistent()).filter(obj -> obj.value() !=
null).map(obj ->
new ConsumerRecord<>(obj.topic(), obj.partition(), obj.offset(),
obj.key(), convertor.fromAvroBinary(obj.value())));
} else {
kafkaRDD = KafkaUtils.createRDD(sparkContext,
offsetGen.getKafkaParams(), offsetRanges,
LocationStrategies.PreferConsistent());
}
- return maybeAppendKafkaOffsets(kafkaRDD);
+ return maybeAppendKafkaOffsets(kafkaRDD.filter(consemerRec ->
consemerRec.value() != null));
}
protected JavaRDD<GenericRecord>
maybeAppendKafkaOffsets(JavaRDD<ConsumerRecord<Object, Object>> kafkaRDD) {
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java
index 16ec4545665..3daa9505538 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java
@@ -125,6 +125,16 @@ public class TestAvroKafkaSource extends
SparkClientFunctionalTestHarness {
}
}
+ void sendMessagesToKafkaWithNullKafkaValue(String topic, int count, int
numPartitions) {
+ Properties config = getProducerProperties();
+ try (Producer<String, byte[]> producer = new KafkaProducer<>(config)) {
+ for (int i = 0; i < count; i++) {
+ // null kafka value
+ producer.send(new ProducerRecord<>(topic, i % numPartitions, "key",
null));
+ }
+ }
+ }
+
private Properties getProducerProperties() {
Properties props = new Properties();
props.put("bootstrap.servers", testUtils.brokerAddress());
@@ -185,6 +195,9 @@ public class TestAvroKafkaSource extends
SparkClientFunctionalTestHarness {
int numMessages = 30;
testUtils.createTopic(topic,numPartitions);
sendMessagesToKafka(topic, numMessages, numPartitions);
+ // send some null value records
+ sendMessagesToKafkaWithNullKafkaValue(topic, numMessages, numPartitions);
+
AvroKafkaSource avroKafkaSource = new AvroKafkaSource(props, jsc(),
spark(), schemaProvider, metrics);
SourceFormatAdapter kafkaSource = new SourceFormatAdapter(avroKafkaSource);
Dataset<Row> c =
kafkaSource.fetchNewDataInRowFormat(Option.empty(),Long.MAX_VALUE)
@@ -214,6 +227,5 @@ public class TestAvroKafkaSource extends
SparkClientFunctionalTestHarness {
Dataset<Row> nullKafkaKeyDataset =
kafkaSourceWithNullKafkaKey.fetchNewDataInRowFormat(Option.empty(),Long.MAX_VALUE)
.getBatch().get();
assertEquals(numMessages,
nullKafkaKeyDataset.toDF().filter("_hoodie_kafka_source_key is null").count());
-
}
}