This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 4099e1d18b7 [HUDI-6683] Added kafka key as part of hudi metadata
columns for Json & Avro KafkaSource (#9403)
4099e1d18b7 is described below
commit 4099e1d18b78583d739fdb252f85b58d991d2fb0
Author: Prathit malik <[email protected]>
AuthorDate: Tue Aug 15 07:37:26 2023 +0530
[HUDI-6683] Added kafka key as part of hudi metadata columns for Json &
Avro KafkaSource (#9403)
---
.../hudi/utilities/schema/KafkaOffsetPostProcessor.java | 6 +++++-
.../org/apache/hudi/utilities/sources/JsonKafkaSource.java | 3 +++
.../apache/hudi/utilities/sources/helpers/AvroConvertor.java | 3 +++
.../apache/hudi/utilities/sources/TestAvroKafkaSource.java | 11 ++++++-----
.../apache/hudi/utilities/sources/TestJsonKafkaSource.java | 9 +++++----
5 files changed, 22 insertions(+), 10 deletions(-)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/KafkaOffsetPostProcessor.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/KafkaOffsetPostProcessor.java
index 63473c3bce8..500bb0c7f99 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/KafkaOffsetPostProcessor.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/KafkaOffsetPostProcessor.java
@@ -18,6 +18,7 @@
package org.apache.hudi.utilities.schema;
+import org.apache.avro.JsonProperties;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.internal.schema.HoodieSchemaException;
@@ -31,6 +32,7 @@ import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.stream.Collectors;
+import static org.apache.hudi.avro.AvroSchemaUtils.createNullableSchema;
import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
/**
@@ -54,6 +56,7 @@ public class KafkaOffsetPostProcessor extends
SchemaPostProcessor {
public static final String KAFKA_SOURCE_OFFSET_COLUMN =
"_hoodie_kafka_source_offset";
public static final String KAFKA_SOURCE_PARTITION_COLUMN =
"_hoodie_kafka_source_partition";
public static final String KAFKA_SOURCE_TIMESTAMP_COLUMN =
"_hoodie_kafka_source_timestamp";
+ public static final String KAFKA_SOURCE_KEY_COLUMN =
"_hoodie_kafka_source_key";
public KafkaOffsetPostProcessor(TypedProperties props, JavaSparkContext
jssc) {
super(props, jssc);
@@ -61,7 +64,7 @@ public class KafkaOffsetPostProcessor extends
SchemaPostProcessor {
@Override
public Schema processSchema(Schema schema) {
- // this method adds kafka offset fields namely source offset, partition
and timestamp to the schema of the batch.
+ // this method adds kafka offset fields namely source offset, partition,
timestamp and kafka message key to the schema of the batch.
try {
List<Schema.Field> fieldList = schema.getFields();
List<Schema.Field> newFieldList = fieldList.stream()
@@ -69,6 +72,7 @@ public class KafkaOffsetPostProcessor extends
SchemaPostProcessor {
newFieldList.add(new Schema.Field(KAFKA_SOURCE_OFFSET_COLUMN,
Schema.create(Schema.Type.LONG), "offset column", 0));
newFieldList.add(new Schema.Field(KAFKA_SOURCE_PARTITION_COLUMN,
Schema.create(Schema.Type.INT), "partition column", 0));
newFieldList.add(new Schema.Field(KAFKA_SOURCE_TIMESTAMP_COLUMN,
Schema.create(Schema.Type.LONG), "timestamp column", 0));
+ newFieldList.add(new Schema.Field(KAFKA_SOURCE_KEY_COLUMN,
createNullableSchema(Schema.Type.STRING), "kafka key column",
JsonProperties.NULL_VALUE));
Schema newSchema = Schema.createRecord(schema.getName() + "_processed",
schema.getDoc(), schema.getNamespace(), false, newFieldList);
return newSchema;
} catch (Exception e) {
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
index 775bd095fe0..de67dc171a9 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
@@ -47,6 +47,7 @@ import static
org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
import static
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_OFFSET_COLUMN;
import static
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_PARTITION_COLUMN;
import static
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_TIMESTAMP_COLUMN;
+import static
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_KEY_COLUMN;
/**
* Read json kafka data.
@@ -80,11 +81,13 @@ public class JsonKafkaSource extends KafkaSource<String> {
ObjectMapper om = new ObjectMapper();
partitionIterator.forEachRemaining(consumerRecord -> {
String record = consumerRecord.value().toString();
+ String recordKey = (String) consumerRecord.key();
try {
ObjectNode jsonNode = (ObjectNode) om.readTree(record);
jsonNode.put(KAFKA_SOURCE_OFFSET_COLUMN, consumerRecord.offset());
jsonNode.put(KAFKA_SOURCE_PARTITION_COLUMN,
consumerRecord.partition());
jsonNode.put(KAFKA_SOURCE_TIMESTAMP_COLUMN,
consumerRecord.timestamp());
+ jsonNode.put(KAFKA_SOURCE_KEY_COLUMN, recordKey);
stringList.add(om.writeValueAsString(jsonNode));
} catch (Throwable e) {
stringList.add(record);
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java
index 857eb3c3f2f..1a7daaa7bca 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java
@@ -41,6 +41,7 @@ import static
org.apache.hudi.utilities.config.HoodieStreamerConfig.SCHEMA_FIELD
import static
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_OFFSET_COLUMN;
import static
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_PARTITION_COLUMN;
import static
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_TIMESTAMP_COLUMN;
+import static
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_KEY_COLUMN;
/**
* Convert a variety of datum into Avro GenericRecords. Has a bunch of lazy
fields to circumvent issues around
@@ -175,9 +176,11 @@ public class AvroConvertor implements Serializable {
for (Schema.Field field : record.getSchema().getFields()) {
recordBuilder.set(field, record.get(field.name()));
}
+
recordBuilder.set(KAFKA_SOURCE_OFFSET_COLUMN, consumerRecord.offset());
recordBuilder.set(KAFKA_SOURCE_PARTITION_COLUMN,
consumerRecord.partition());
recordBuilder.set(KAFKA_SOURCE_TIMESTAMP_COLUMN,
consumerRecord.timestamp());
+ recordBuilder.set(KAFKA_SOURCE_KEY_COLUMN,
String.valueOf(consumerRecord.key()));
return recordBuilder.build();
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java
index f57f87e58bc..2632f72659b 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java
@@ -60,6 +60,7 @@ import java.util.stream.Collectors;
import static
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_OFFSET_COLUMN;
import static
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_PARTITION_COLUMN;
import static
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_TIMESTAMP_COLUMN;
+import static
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_KEY_COLUMN;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.mock;
@@ -145,7 +146,7 @@ public class TestAvroKafkaSource extends
SparkClientFunctionalTestHarness {
UtilHelpers.createSchemaProvider(FilebasedSchemaProvider.class.getName(),
props, jsc()), props, jsc(), new ArrayList<>());
avroKafkaSource = new AvroKafkaSource(props, jsc(), spark(),
schemaProvider, null);
GenericRecord withKafkaOffsets =
avroKafkaSource.maybeAppendKafkaOffsets(rdd).collect().get(0);
- assertEquals(3,withKafkaOffsets.getSchema().getFields().size() -
withoutKafkaOffsets.getSchema().getFields().size());
+ assertEquals(4,withKafkaOffsets.getSchema().getFields().size() -
withoutKafkaOffsets.getSchema().getFields().size());
}
@Test
@@ -180,9 +181,9 @@ public class TestAvroKafkaSource extends
SparkClientFunctionalTestHarness {
assertEquals(numMessages / numPartitions,
d.filter("_hoodie_kafka_source_partition=" + i).collectAsList().size());
}
List<String> withKafkaOffsetColumns =
Arrays.stream(d.columns()).collect(Collectors.toList());
- assertEquals(0, d.drop(KAFKA_SOURCE_OFFSET_COLUMN,
KAFKA_SOURCE_PARTITION_COLUMN,
KAFKA_SOURCE_TIMESTAMP_COLUMN,"city_to_state").except(c.drop("city_to_state")).count());
- assertEquals(3, withKafkaOffsetColumns.size() - columns.size());
- List<String> appendList = Arrays.asList(KAFKA_SOURCE_OFFSET_COLUMN,
KAFKA_SOURCE_PARTITION_COLUMN, KAFKA_SOURCE_TIMESTAMP_COLUMN);
- assertEquals(appendList,
withKafkaOffsetColumns.subList(withKafkaOffsetColumns.size() - 3,
withKafkaOffsetColumns.size()));
+ assertEquals(0, d.drop(KAFKA_SOURCE_OFFSET_COLUMN,
KAFKA_SOURCE_PARTITION_COLUMN, KAFKA_SOURCE_TIMESTAMP_COLUMN,
KAFKA_SOURCE_KEY_COLUMN,"city_to_state").except(c.drop("city_to_state")).count());
+ assertEquals(4, withKafkaOffsetColumns.size() - columns.size());
+ List<String> appendList = Arrays.asList(KAFKA_SOURCE_OFFSET_COLUMN,
KAFKA_SOURCE_PARTITION_COLUMN, KAFKA_SOURCE_TIMESTAMP_COLUMN,
KAFKA_SOURCE_KEY_COLUMN);
+ assertEquals(appendList,
withKafkaOffsetColumns.subList(withKafkaOffsetColumns.size() - 4,
withKafkaOffsetColumns.size()));
}
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
index e806b02c69c..5b0e7667fc0 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
@@ -63,6 +63,7 @@ import static
org.apache.hudi.utilities.config.KafkaSourceConfig.ENABLE_KAFKA_CO
import static
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_OFFSET_COLUMN;
import static
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_PARTITION_COLUMN;
import static
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_TIMESTAMP_COLUMN;
+import static
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_KEY_COLUMN;
import static
org.apache.hudi.utilities.testutils.UtilitiesTestBase.Helpers.jsonifyRecords;
import static
org.apache.hudi.utilities.testutils.UtilitiesTestBase.Helpers.jsonifyRecordsByPartitions;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -331,12 +332,12 @@ public class TestJsonKafkaSource extends
BaseTestKafkaSource {
assertEquals(numMessages / numPartitions,
dfWithOffsetInfo.filter("_hoodie_kafka_source_partition=" + i).count());
}
assertEquals(0, dfWithOffsetInfo
- .drop(KAFKA_SOURCE_OFFSET_COLUMN, KAFKA_SOURCE_PARTITION_COLUMN,
KAFKA_SOURCE_TIMESTAMP_COLUMN)
+ .drop(KAFKA_SOURCE_OFFSET_COLUMN, KAFKA_SOURCE_PARTITION_COLUMN,
KAFKA_SOURCE_TIMESTAMP_COLUMN, KAFKA_SOURCE_KEY_COLUMN)
.except(dfNoOffsetInfo).count());
List<String> withKafkaOffsetColumns =
Arrays.stream(dfWithOffsetInfo.columns()).collect(Collectors.toList());
- assertEquals(3, withKafkaOffsetColumns.size() - columns.size());
- List<String> appendList = Arrays.asList(KAFKA_SOURCE_OFFSET_COLUMN,
KAFKA_SOURCE_PARTITION_COLUMN, KAFKA_SOURCE_TIMESTAMP_COLUMN);
- assertEquals(appendList,
withKafkaOffsetColumns.subList(withKafkaOffsetColumns.size() - 3,
withKafkaOffsetColumns.size()));
+ assertEquals(4, withKafkaOffsetColumns.size() - columns.size());
+ List<String> appendList = Arrays.asList(KAFKA_SOURCE_OFFSET_COLUMN,
KAFKA_SOURCE_PARTITION_COLUMN, KAFKA_SOURCE_TIMESTAMP_COLUMN,
KAFKA_SOURCE_KEY_COLUMN);
+ assertEquals(appendList,
withKafkaOffsetColumns.subList(withKafkaOffsetColumns.size() - 4,
withKafkaOffsetColumns.size()));
dfNoOffsetInfo.unpersist();
dfWithOffsetInfo.unpersist();