This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new ed5997348f5 [HUDI-6683][FOLLOW-UP] Json & Avro Kafka Source Minor
Refactor & Added null Kafka Key test cases (#9459)
ed5997348f5 is described below
commit ed5997348f5284e107f0ca177241aa5ffc832f62
Author: Prathit malik <[email protected]>
AuthorDate: Tue Aug 22 06:31:47 2023 +0530
[HUDI-6683][FOLLOW-UP] Json & Avro Kafka Source Minor Refactor & Added null
Kafka Key test cases (#9459)
---
.../hudi/utilities/sources/JsonKafkaSource.java | 2 +-
.../utilities/sources/helpers/AvroConvertor.java | 11 ++++----
.../utilities/sources/TestAvroKafkaSource.java | 30 ++++++++++++++++++++++
.../utilities/sources/TestJsonKafkaSource.java | 14 ++++++++++
.../utilities/testutils/UtilitiesTestBase.java | 9 +++++++
5 files changed, 60 insertions(+), 6 deletions(-)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
index f31c9b7e542..eb67abfee3a 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
@@ -81,7 +81,7 @@ public class JsonKafkaSource extends KafkaSource<String> {
ObjectMapper om = new ObjectMapper();
partitionIterator.forEachRemaining(consumerRecord -> {
String recordValue = consumerRecord.value().toString();
- String recordKey = consumerRecord.key().toString();
+ String recordKey = StringUtils.objToString(consumerRecord.key());
try {
ObjectNode jsonNode = (ObjectNode) om.readTree(recordValue);
jsonNode.put(KAFKA_SOURCE_OFFSET_COLUMN, consumerRecord.offset());
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java
index 89191cb465c..f9c35bd3b6e 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java
@@ -19,6 +19,7 @@
package org.apache.hudi.utilities.sources.helpers;
import org.apache.hudi.avro.MercifulJsonConverter;
+import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.internal.schema.HoodieSchemaException;
import com.google.protobuf.Message;
@@ -171,16 +172,16 @@ public class AvroConvertor implements Serializable {
*/
public GenericRecord withKafkaFieldsAppended(ConsumerRecord consumerRecord) {
initSchema();
- GenericRecord record = (GenericRecord) consumerRecord.value();
+ GenericRecord recordValue = (GenericRecord) consumerRecord.value();
GenericRecordBuilder recordBuilder = new GenericRecordBuilder(this.schema);
- for (Schema.Field field : record.getSchema().getFields()) {
- recordBuilder.set(field, record.get(field.name()));
+ for (Schema.Field field : recordValue.getSchema().getFields()) {
+ recordBuilder.set(field, recordValue.get(field.name()));
}
-
+ String recordKey = StringUtils.objToString(consumerRecord.key());
recordBuilder.set(KAFKA_SOURCE_OFFSET_COLUMN, consumerRecord.offset());
recordBuilder.set(KAFKA_SOURCE_PARTITION_COLUMN,
consumerRecord.partition());
recordBuilder.set(KAFKA_SOURCE_TIMESTAMP_COLUMN,
consumerRecord.timestamp());
- recordBuilder.set(KAFKA_SOURCE_KEY_COLUMN,
consumerRecord.key().toString());
+ recordBuilder.set(KAFKA_SOURCE_KEY_COLUMN, recordKey);
return recordBuilder.build();
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java
index 2632f72659b..16ec4545665 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java
@@ -62,6 +62,7 @@ import static
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SO
import static
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_TIMESTAMP_COLUMN;
import static
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_KEY_COLUMN;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.mockito.Mockito.mock;
public class TestAvroKafkaSource extends SparkClientFunctionalTestHarness {
@@ -113,6 +114,17 @@ public class TestAvroKafkaSource extends
SparkClientFunctionalTestHarness {
}
}
+ void sendMessagesToKafkaWithNullKafkaKey(String topic, int count, int
numPartitions) {
+ List<GenericRecord> genericRecords = dataGen.generateGenericRecords(count);
+ Properties config = getProducerProperties();
+ try (Producer<String, byte[]> producer = new KafkaProducer<>(config)) {
+ for (int i = 0; i < genericRecords.size(); i++) {
+ // null kafka key
+ producer.send(new ProducerRecord<>(topic, i % numPartitions, null,
HoodieAvroUtils.avroToBytes(genericRecords.get(i))));
+ }
+ }
+ }
+
private Properties getProducerProperties() {
Properties props = new Properties();
props.put("bootstrap.servers", testUtils.brokerAddress());
@@ -147,6 +159,15 @@ public class TestAvroKafkaSource extends
SparkClientFunctionalTestHarness {
avroKafkaSource = new AvroKafkaSource(props, jsc(), spark(),
schemaProvider, null);
GenericRecord withKafkaOffsets =
avroKafkaSource.maybeAppendKafkaOffsets(rdd).collect().get(0);
assertEquals(4,withKafkaOffsets.getSchema().getFields().size() -
withoutKafkaOffsets.getSchema().getFields().size());
+
assertEquals("test",withKafkaOffsets.get("_hoodie_kafka_source_key").toString());
+
+ // scenario with null kafka key
+ ConsumerRecord<Object, Object> recordConsumerRecordNullKafkaKey = new
ConsumerRecord<Object,Object>("test", 0, 1L,
+ null, dataGen.generateGenericRecord());
+ JavaRDD<ConsumerRecord<Object, Object>> rddNullKafkaKey =
jsc().parallelize(Arrays.asList(recordConsumerRecordNullKafkaKey));
+ avroKafkaSource = new AvroKafkaSource(props, jsc(), spark(),
schemaProvider, null);
+ GenericRecord withKafkaOffsetsAndNullKafkaKey =
avroKafkaSource.maybeAppendKafkaOffsets(rddNullKafkaKey).collect().get(0);
+
assertNull(withKafkaOffsetsAndNullKafkaKey.get("_hoodie_kafka_source_key"));
}
@Test
@@ -185,5 +206,14 @@ public class TestAvroKafkaSource extends
SparkClientFunctionalTestHarness {
assertEquals(4, withKafkaOffsetColumns.size() - columns.size());
List<String> appendList = Arrays.asList(KAFKA_SOURCE_OFFSET_COLUMN,
KAFKA_SOURCE_PARTITION_COLUMN, KAFKA_SOURCE_TIMESTAMP_COLUMN,
KAFKA_SOURCE_KEY_COLUMN);
assertEquals(appendList,
withKafkaOffsetColumns.subList(withKafkaOffsetColumns.size() - 4,
withKafkaOffsetColumns.size()));
+
+ // scenario with null kafka key
+ sendMessagesToKafkaWithNullKafkaKey(topic, numMessages, numPartitions);
+ AvroKafkaSource avroKafkaSourceWithNullKafkaKey = new
AvroKafkaSource(props, jsc(), spark(), schemaProvider, metrics);
+ SourceFormatAdapter kafkaSourceWithNullKafkaKey = new
SourceFormatAdapter(avroKafkaSourceWithNullKafkaKey);
+ Dataset<Row> nullKafkaKeyDataset =
kafkaSourceWithNullKafkaKey.fetchNewDataInRowFormat(Option.empty(),Long.MAX_VALUE)
+ .getBatch().get();
+ assertEquals(numMessages,
nullKafkaKeyDataset.toDF().filter("_hoodie_kafka_source_key is null").count());
+
}
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
index 5b0e7667fc0..60887613d64 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
@@ -66,6 +66,7 @@ import static
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SO
import static
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_KEY_COLUMN;
import static
org.apache.hudi.utilities.testutils.UtilitiesTestBase.Helpers.jsonifyRecords;
import static
org.apache.hudi.utilities.testutils.UtilitiesTestBase.Helpers.jsonifyRecordsByPartitions;
+import static
org.apache.hudi.utilities.testutils.UtilitiesTestBase.Helpers.jsonifyRecordsByPartitionsWithNullKafkaKey;
import static org.junit.jupiter.api.Assertions.assertEquals;
/**
@@ -206,6 +207,11 @@ public class TestJsonKafkaSource extends
BaseTestKafkaSource {
testUtils.sendMessages(topic,
jsonifyRecordsByPartitions(dataGenerator.generateInsertsAsPerSchema("000",
count, HoodieTestDataGenerator.SHORT_TRIP_SCHEMA), numPartitions));
}
+ void sendNullKafkaKeyMessagesToKafka(String topic, int count, int
numPartitions) {
+ HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
+ testUtils.sendMessages(topic,
jsonifyRecordsByPartitionsWithNullKafkaKey(dataGenerator.generateInsertsAsPerSchema("000",
count, HoodieTestDataGenerator.SHORT_TRIP_SCHEMA), numPartitions));
+ }
+
void sendJsonSafeMessagesToKafka(String topic, int count, int numPartitions)
{
try {
Tuple2<String, String>[] keyValues = new Tuple2[count];
@@ -339,7 +345,15 @@ public class TestJsonKafkaSource extends
BaseTestKafkaSource {
List<String> appendList = Arrays.asList(KAFKA_SOURCE_OFFSET_COLUMN,
KAFKA_SOURCE_PARTITION_COLUMN, KAFKA_SOURCE_TIMESTAMP_COLUMN,
KAFKA_SOURCE_KEY_COLUMN);
assertEquals(appendList,
withKafkaOffsetColumns.subList(withKafkaOffsetColumns.size() - 4,
withKafkaOffsetColumns.size()));
+ // scenario with null kafka key
+ sendNullKafkaKeyMessagesToKafka(topic, numMessages, numPartitions);
+ jsonSource = new JsonKafkaSource(props, jsc(), spark(), schemaProvider,
metrics);
+ kafkaSource = new SourceFormatAdapter(jsonSource);
+ Dataset<Row> dfWithOffsetInfoAndNullKafkaKey =
kafkaSource.fetchNewDataInRowFormat(Option.empty(),
Long.MAX_VALUE).getBatch().get().cache();
+ assertEquals(numMessages,
dfWithOffsetInfoAndNullKafkaKey.toDF().filter("_hoodie_kafka_source_key is
null").count());
+
dfNoOffsetInfo.unpersist();
dfWithOffsetInfo.unpersist();
+ dfWithOffsetInfoAndNullKafkaKey.unpersist();
}
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
index b9555cb29c2..058ed72a3be 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
@@ -447,6 +447,15 @@ public class UtilitiesTestBase {
return data;
}
+ public static Tuple2<String, String>[]
jsonifyRecordsByPartitionsWithNullKafkaKey(List<HoodieRecord> records, int
partitions) {
+ Tuple2<String, String>[] data = new Tuple2[records.size()];
+ for (int i = 0; i < records.size(); i++) {
+ String value = Helpers.toJsonString(records.get(i));
+ data[i] = new Tuple2<>(null, value);
+ }
+ return data;
+ }
+
private static void addAvroRecord(
VectorizedRowBatch batch,
GenericRecord record,