This is an automated email from the ASF dual-hosted git repository. pwason pushed a commit to branch release-0.14.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 97f21f85e9596aebee756d10a4a1ad5c229c1fae Author: Prathit malik <[email protected]> AuthorDate: Tue Aug 15 07:37:26 2023 +0530 [HUDI-6683] Added kafka key as part of hudi metadata columns for Json & Avro KafkaSource (#9403) --- .../hudi/utilities/schema/KafkaOffsetPostProcessor.java | 6 +++++- .../org/apache/hudi/utilities/sources/JsonKafkaSource.java | 3 +++ .../apache/hudi/utilities/sources/helpers/AvroConvertor.java | 3 +++ .../apache/hudi/utilities/sources/TestAvroKafkaSource.java | 11 ++++++----- .../apache/hudi/utilities/sources/TestJsonKafkaSource.java | 9 +++++---- 5 files changed, 22 insertions(+), 10 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/KafkaOffsetPostProcessor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/KafkaOffsetPostProcessor.java index 63473c3bce8..500bb0c7f99 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/KafkaOffsetPostProcessor.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/KafkaOffsetPostProcessor.java @@ -18,6 +18,7 @@ package org.apache.hudi.utilities.schema; +import org.apache.avro.JsonProperties; import org.apache.hudi.common.config.ConfigProperty; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.internal.schema.HoodieSchemaException; @@ -31,6 +32,7 @@ import org.slf4j.LoggerFactory; import java.util.List; import java.util.stream.Collectors; +import static org.apache.hudi.avro.AvroSchemaUtils.createNullableSchema; import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys; /** @@ -54,6 +56,7 @@ public class KafkaOffsetPostProcessor extends SchemaPostProcessor { public static final String KAFKA_SOURCE_OFFSET_COLUMN = "_hoodie_kafka_source_offset"; public static final String KAFKA_SOURCE_PARTITION_COLUMN = "_hoodie_kafka_source_partition"; public static final String KAFKA_SOURCE_TIMESTAMP_COLUMN = "_hoodie_kafka_source_timestamp"; + public static final String KAFKA_SOURCE_KEY_COLUMN = "_hoodie_kafka_source_key"; public KafkaOffsetPostProcessor(TypedProperties props, JavaSparkContext jssc) { super(props, jssc); @@ -61,7 +64,7 @@ public class KafkaOffsetPostProcessor extends SchemaPostProcessor { @Override public Schema processSchema(Schema schema) { - // this method adds kafka offset fields namely source offset, partition and timestamp to the schema of the batch. + // this method adds kafka offset fields namely source offset, partition, timestamp and kafka message key to the schema of the batch. try { List<Schema.Field> fieldList = schema.getFields(); List<Schema.Field> newFieldList = fieldList.stream() @@ -69,6 +72,7 @@ public class KafkaOffsetPostProcessor extends SchemaPostProcessor { newFieldList.add(new Schema.Field(KAFKA_SOURCE_OFFSET_COLUMN, Schema.create(Schema.Type.LONG), "offset column", 0)); newFieldList.add(new Schema.Field(KAFKA_SOURCE_PARTITION_COLUMN, Schema.create(Schema.Type.INT), "partition column", 0)); newFieldList.add(new Schema.Field(KAFKA_SOURCE_TIMESTAMP_COLUMN, Schema.create(Schema.Type.LONG), "timestamp column", 0)); + newFieldList.add(new Schema.Field(KAFKA_SOURCE_KEY_COLUMN, createNullableSchema(Schema.Type.STRING), "kafka key column", JsonProperties.NULL_VALUE)); Schema newSchema = Schema.createRecord(schema.getName() + "_processed", schema.getDoc(), schema.getNamespace(), false, newFieldList); return newSchema; } catch (Exception e) { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java index 775bd095fe0..de67dc171a9 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java @@ -47,6 +47,7 @@ import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys; import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_OFFSET_COLUMN; import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_PARTITION_COLUMN; import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_TIMESTAMP_COLUMN; +import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_KEY_COLUMN; /** * Read json kafka data. @@ -80,11 +81,13 @@ public class JsonKafkaSource extends KafkaSource<String> { ObjectMapper om = new ObjectMapper(); partitionIterator.forEachRemaining(consumerRecord -> { String record = consumerRecord.value().toString(); + String recordKey = (String) consumerRecord.key(); try { ObjectNode jsonNode = (ObjectNode) om.readTree(record); jsonNode.put(KAFKA_SOURCE_OFFSET_COLUMN, consumerRecord.offset()); jsonNode.put(KAFKA_SOURCE_PARTITION_COLUMN, consumerRecord.partition()); jsonNode.put(KAFKA_SOURCE_TIMESTAMP_COLUMN, consumerRecord.timestamp()); + jsonNode.put(KAFKA_SOURCE_KEY_COLUMN, recordKey); stringList.add(om.writeValueAsString(jsonNode)); } catch (Throwable e) { stringList.add(record); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java index 857eb3c3f2f..1a7daaa7bca 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java @@ -41,6 +41,7 @@ import static org.apache.hudi.utilities.config.HoodieStreamerConfig.SCHEMA_FIELD import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_OFFSET_COLUMN; import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_PARTITION_COLUMN; import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_TIMESTAMP_COLUMN; +import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_KEY_COLUMN; /** * Convert a variety of datum into Avro GenericRecords. Has a bunch of lazy fields to circumvent issues around @@ -175,9 +176,11 @@ public class AvroConvertor implements Serializable { for (Schema.Field field : record.getSchema().getFields()) { recordBuilder.set(field, record.get(field.name())); } + recordBuilder.set(KAFKA_SOURCE_OFFSET_COLUMN, consumerRecord.offset()); recordBuilder.set(KAFKA_SOURCE_PARTITION_COLUMN, consumerRecord.partition()); recordBuilder.set(KAFKA_SOURCE_TIMESTAMP_COLUMN, consumerRecord.timestamp()); + recordBuilder.set(KAFKA_SOURCE_KEY_COLUMN, String.valueOf(consumerRecord.key())); return recordBuilder.build(); } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java index f57f87e58bc..2632f72659b 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java @@ -60,6 +60,7 @@ import java.util.stream.Collectors; import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_OFFSET_COLUMN; import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_PARTITION_COLUMN; import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_TIMESTAMP_COLUMN; +import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_KEY_COLUMN; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.Mockito.mock; @@ -145,7 +146,7 @@ public class TestAvroKafkaSource extends SparkClientFunctionalTestHarness { UtilHelpers.createSchemaProvider(FilebasedSchemaProvider.class.getName(), props, jsc()), props, jsc(), new ArrayList<>()); avroKafkaSource = new AvroKafkaSource(props, jsc(), spark(), schemaProvider, null); GenericRecord withKafkaOffsets = avroKafkaSource.maybeAppendKafkaOffsets(rdd).collect().get(0); - assertEquals(3,withKafkaOffsets.getSchema().getFields().size() - withoutKafkaOffsets.getSchema().getFields().size()); + assertEquals(4,withKafkaOffsets.getSchema().getFields().size() - withoutKafkaOffsets.getSchema().getFields().size()); } @Test @@ -180,9 +181,9 @@ public class TestAvroKafkaSource extends SparkClientFunctionalTestHarness { assertEquals(numMessages / numPartitions, d.filter("_hoodie_kafka_source_partition=" + i).collectAsList().size()); } List<String> withKafkaOffsetColumns = Arrays.stream(d.columns()).collect(Collectors.toList()); - assertEquals(0, d.drop(KAFKA_SOURCE_OFFSET_COLUMN, KAFKA_SOURCE_PARTITION_COLUMN, KAFKA_SOURCE_TIMESTAMP_COLUMN,"city_to_state").except(c.drop("city_to_state")).count()); - assertEquals(3, withKafkaOffsetColumns.size() - columns.size()); - List<String> appendList = Arrays.asList(KAFKA_SOURCE_OFFSET_COLUMN, KAFKA_SOURCE_PARTITION_COLUMN, KAFKA_SOURCE_TIMESTAMP_COLUMN); - assertEquals(appendList, withKafkaOffsetColumns.subList(withKafkaOffsetColumns.size() - 3, withKafkaOffsetColumns.size())); + assertEquals(0, d.drop(KAFKA_SOURCE_OFFSET_COLUMN, KAFKA_SOURCE_PARTITION_COLUMN, KAFKA_SOURCE_TIMESTAMP_COLUMN, KAFKA_SOURCE_KEY_COLUMN,"city_to_state").except(c.drop("city_to_state")).count()); + assertEquals(4, withKafkaOffsetColumns.size() - columns.size()); + List<String> appendList = Arrays.asList(KAFKA_SOURCE_OFFSET_COLUMN, KAFKA_SOURCE_PARTITION_COLUMN, KAFKA_SOURCE_TIMESTAMP_COLUMN, KAFKA_SOURCE_KEY_COLUMN); + assertEquals(appendList, withKafkaOffsetColumns.subList(withKafkaOffsetColumns.size() - 4, withKafkaOffsetColumns.size())); } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java index e806b02c69c..5b0e7667fc0 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java @@ -63,6 +63,7 @@ import static org.apache.hudi.utilities.config.KafkaSourceConfig.ENABLE_KAFKA_CO import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_OFFSET_COLUMN; import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_PARTITION_COLUMN; import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_TIMESTAMP_COLUMN; +import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_KEY_COLUMN; import static org.apache.hudi.utilities.testutils.UtilitiesTestBase.Helpers.jsonifyRecords; import static org.apache.hudi.utilities.testutils.UtilitiesTestBase.Helpers.jsonifyRecordsByPartitions; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -331,12 +332,12 @@ public class TestJsonKafkaSource extends BaseTestKafkaSource { assertEquals(numMessages / numPartitions, dfWithOffsetInfo.filter("_hoodie_kafka_source_partition=" + i).count()); } assertEquals(0, dfWithOffsetInfo - .drop(KAFKA_SOURCE_OFFSET_COLUMN, KAFKA_SOURCE_PARTITION_COLUMN, KAFKA_SOURCE_TIMESTAMP_COLUMN) + .drop(KAFKA_SOURCE_OFFSET_COLUMN, KAFKA_SOURCE_PARTITION_COLUMN, KAFKA_SOURCE_TIMESTAMP_COLUMN, KAFKA_SOURCE_KEY_COLUMN) .except(dfNoOffsetInfo).count()); List<String> withKafkaOffsetColumns = Arrays.stream(dfWithOffsetInfo.columns()).collect(Collectors.toList()); - assertEquals(3, withKafkaOffsetColumns.size() - columns.size()); - List<String> appendList = Arrays.asList(KAFKA_SOURCE_OFFSET_COLUMN, KAFKA_SOURCE_PARTITION_COLUMN, KAFKA_SOURCE_TIMESTAMP_COLUMN); - assertEquals(appendList, withKafkaOffsetColumns.subList(withKafkaOffsetColumns.size() - 3, withKafkaOffsetColumns.size())); + assertEquals(4, withKafkaOffsetColumns.size() - columns.size()); + List<String> appendList = Arrays.asList(KAFKA_SOURCE_OFFSET_COLUMN, KAFKA_SOURCE_PARTITION_COLUMN, KAFKA_SOURCE_TIMESTAMP_COLUMN, KAFKA_SOURCE_KEY_COLUMN); + assertEquals(appendList, withKafkaOffsetColumns.subList(withKafkaOffsetColumns.size() - 4, withKafkaOffsetColumns.size())); dfNoOffsetInfo.unpersist(); dfWithOffsetInfo.unpersist();
