This is an automated email from the ASF dual-hosted git repository.

pwason pushed a commit to branch release-0.14.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 97f21f85e9596aebee756d10a4a1ad5c229c1fae
Author: Prathit malik <[email protected]>
AuthorDate: Tue Aug 15 07:37:26 2023 +0530

    [HUDI-6683] Added kafka key as part of hudi metadata columns for Json & 
Avro KafkaSource (#9403)
---
 .../hudi/utilities/schema/KafkaOffsetPostProcessor.java       |  6 +++++-
 .../org/apache/hudi/utilities/sources/JsonKafkaSource.java    |  3 +++
 .../apache/hudi/utilities/sources/helpers/AvroConvertor.java  |  3 +++
 .../apache/hudi/utilities/sources/TestAvroKafkaSource.java    | 11 ++++++-----
 .../apache/hudi/utilities/sources/TestJsonKafkaSource.java    |  9 +++++----
 5 files changed, 22 insertions(+), 10 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/KafkaOffsetPostProcessor.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/KafkaOffsetPostProcessor.java
index 63473c3bce8..500bb0c7f99 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/KafkaOffsetPostProcessor.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/KafkaOffsetPostProcessor.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.utilities.schema;
 
+import org.apache.avro.JsonProperties;
 import org.apache.hudi.common.config.ConfigProperty;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.internal.schema.HoodieSchemaException;
@@ -31,6 +32,7 @@ import org.slf4j.LoggerFactory;
 import java.util.List;
 import java.util.stream.Collectors;
 
+import static org.apache.hudi.avro.AvroSchemaUtils.createNullableSchema;
 import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
 
 /**
@@ -54,6 +56,7 @@ public class KafkaOffsetPostProcessor extends 
SchemaPostProcessor {
   public static final String KAFKA_SOURCE_OFFSET_COLUMN = 
"_hoodie_kafka_source_offset";
   public static final String KAFKA_SOURCE_PARTITION_COLUMN = 
"_hoodie_kafka_source_partition";
   public static final String KAFKA_SOURCE_TIMESTAMP_COLUMN = 
"_hoodie_kafka_source_timestamp";
+  public static final String KAFKA_SOURCE_KEY_COLUMN = 
"_hoodie_kafka_source_key";
 
   public KafkaOffsetPostProcessor(TypedProperties props, JavaSparkContext 
jssc) {
     super(props, jssc);
@@ -61,7 +64,7 @@ public class KafkaOffsetPostProcessor extends 
SchemaPostProcessor {
 
   @Override
   public Schema processSchema(Schema schema) {
-    // this method adds kafka offset fields namely source offset, partition 
and timestamp to the schema of the batch.
+    // this method adds kafka offset fields namely source offset, partition, 
timestamp and kafka message key to the schema of the batch.
     try {
       List<Schema.Field> fieldList = schema.getFields();
       List<Schema.Field> newFieldList = fieldList.stream()
@@ -69,6 +72,7 @@ public class KafkaOffsetPostProcessor extends 
SchemaPostProcessor {
       newFieldList.add(new Schema.Field(KAFKA_SOURCE_OFFSET_COLUMN, 
Schema.create(Schema.Type.LONG), "offset column", 0));
       newFieldList.add(new Schema.Field(KAFKA_SOURCE_PARTITION_COLUMN, 
Schema.create(Schema.Type.INT), "partition column", 0));
       newFieldList.add(new Schema.Field(KAFKA_SOURCE_TIMESTAMP_COLUMN, 
Schema.create(Schema.Type.LONG), "timestamp column", 0));
+      newFieldList.add(new Schema.Field(KAFKA_SOURCE_KEY_COLUMN, 
createNullableSchema(Schema.Type.STRING), "kafka key column", 
JsonProperties.NULL_VALUE));
       Schema newSchema = Schema.createRecord(schema.getName() + "_processed", 
schema.getDoc(), schema.getNamespace(), false, newFieldList);
       return newSchema;
     } catch (Exception e) {
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
index 775bd095fe0..de67dc171a9 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
@@ -47,6 +47,7 @@ import static 
org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
 import static 
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_OFFSET_COLUMN;
 import static 
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_PARTITION_COLUMN;
 import static 
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_TIMESTAMP_COLUMN;
+import static 
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_KEY_COLUMN;
 
 /**
  * Read json kafka data.
@@ -80,11 +81,13 @@ public class JsonKafkaSource extends KafkaSource<String> {
         ObjectMapper om = new ObjectMapper();
         partitionIterator.forEachRemaining(consumerRecord -> {
           String record = consumerRecord.value().toString();
+          String recordKey = (String) consumerRecord.key();
           try {
             ObjectNode jsonNode = (ObjectNode) om.readTree(record);
             jsonNode.put(KAFKA_SOURCE_OFFSET_COLUMN, consumerRecord.offset());
             jsonNode.put(KAFKA_SOURCE_PARTITION_COLUMN, 
consumerRecord.partition());
             jsonNode.put(KAFKA_SOURCE_TIMESTAMP_COLUMN, 
consumerRecord.timestamp());
+            jsonNode.put(KAFKA_SOURCE_KEY_COLUMN, recordKey);
             stringList.add(om.writeValueAsString(jsonNode));
           } catch (Throwable e) {
             stringList.add(record);
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java
index 857eb3c3f2f..1a7daaa7bca 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java
@@ -41,6 +41,7 @@ import static 
org.apache.hudi.utilities.config.HoodieStreamerConfig.SCHEMA_FIELD
 import static 
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_OFFSET_COLUMN;
 import static 
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_PARTITION_COLUMN;
 import static 
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_TIMESTAMP_COLUMN;
+import static 
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_KEY_COLUMN;
 
 /**
  * Convert a variety of datum into Avro GenericRecords. Has a bunch of lazy 
fields to circumvent issues around
@@ -175,9 +176,11 @@ public class AvroConvertor implements Serializable {
     for (Schema.Field field :  record.getSchema().getFields()) {
       recordBuilder.set(field, record.get(field.name()));
     }
+    
     recordBuilder.set(KAFKA_SOURCE_OFFSET_COLUMN, consumerRecord.offset());
     recordBuilder.set(KAFKA_SOURCE_PARTITION_COLUMN, 
consumerRecord.partition());
     recordBuilder.set(KAFKA_SOURCE_TIMESTAMP_COLUMN, 
consumerRecord.timestamp());
+    recordBuilder.set(KAFKA_SOURCE_KEY_COLUMN, 
String.valueOf(consumerRecord.key()));
     return recordBuilder.build();
   }
 
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java
index f57f87e58bc..2632f72659b 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java
@@ -60,6 +60,7 @@ import java.util.stream.Collectors;
 import static 
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_OFFSET_COLUMN;
 import static 
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_PARTITION_COLUMN;
 import static 
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_TIMESTAMP_COLUMN;
+import static 
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_KEY_COLUMN;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.mockito.Mockito.mock;
 
@@ -145,7 +146,7 @@ public class TestAvroKafkaSource extends 
SparkClientFunctionalTestHarness {
         
UtilHelpers.createSchemaProvider(FilebasedSchemaProvider.class.getName(), 
props, jsc()), props, jsc(), new ArrayList<>());
     avroKafkaSource = new AvroKafkaSource(props, jsc(), spark(), 
schemaProvider, null);
     GenericRecord withKafkaOffsets = 
avroKafkaSource.maybeAppendKafkaOffsets(rdd).collect().get(0);
-    assertEquals(3,withKafkaOffsets.getSchema().getFields().size() - 
withoutKafkaOffsets.getSchema().getFields().size());
+    assertEquals(4,withKafkaOffsets.getSchema().getFields().size() - 
withoutKafkaOffsets.getSchema().getFields().size());
   }
 
   @Test
@@ -180,9 +181,9 @@ public class TestAvroKafkaSource extends 
SparkClientFunctionalTestHarness {
       assertEquals(numMessages / numPartitions, 
d.filter("_hoodie_kafka_source_partition=" + i).collectAsList().size());
     }
     List<String> withKafkaOffsetColumns = 
Arrays.stream(d.columns()).collect(Collectors.toList());
-    assertEquals(0, d.drop(KAFKA_SOURCE_OFFSET_COLUMN, 
KAFKA_SOURCE_PARTITION_COLUMN, 
KAFKA_SOURCE_TIMESTAMP_COLUMN,"city_to_state").except(c.drop("city_to_state")).count());
-    assertEquals(3, withKafkaOffsetColumns.size() - columns.size());
-    List<String> appendList = Arrays.asList(KAFKA_SOURCE_OFFSET_COLUMN, 
KAFKA_SOURCE_PARTITION_COLUMN, KAFKA_SOURCE_TIMESTAMP_COLUMN);
-    assertEquals(appendList, 
withKafkaOffsetColumns.subList(withKafkaOffsetColumns.size() - 3, 
withKafkaOffsetColumns.size()));
+    assertEquals(0, d.drop(KAFKA_SOURCE_OFFSET_COLUMN, 
KAFKA_SOURCE_PARTITION_COLUMN, KAFKA_SOURCE_TIMESTAMP_COLUMN, 
KAFKA_SOURCE_KEY_COLUMN,"city_to_state").except(c.drop("city_to_state")).count());
+    assertEquals(4, withKafkaOffsetColumns.size() - columns.size());
+    List<String> appendList = Arrays.asList(KAFKA_SOURCE_OFFSET_COLUMN, 
KAFKA_SOURCE_PARTITION_COLUMN, KAFKA_SOURCE_TIMESTAMP_COLUMN, 
KAFKA_SOURCE_KEY_COLUMN);
+    assertEquals(appendList, 
withKafkaOffsetColumns.subList(withKafkaOffsetColumns.size() - 4, 
withKafkaOffsetColumns.size()));
   }
 }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
index e806b02c69c..5b0e7667fc0 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
@@ -63,6 +63,7 @@ import static 
org.apache.hudi.utilities.config.KafkaSourceConfig.ENABLE_KAFKA_CO
 import static 
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_OFFSET_COLUMN;
 import static 
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_PARTITION_COLUMN;
 import static 
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_TIMESTAMP_COLUMN;
+import static 
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_KEY_COLUMN;
 import static 
org.apache.hudi.utilities.testutils.UtilitiesTestBase.Helpers.jsonifyRecords;
 import static 
org.apache.hudi.utilities.testutils.UtilitiesTestBase.Helpers.jsonifyRecordsByPartitions;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -331,12 +332,12 @@ public class TestJsonKafkaSource extends 
BaseTestKafkaSource {
       assertEquals(numMessages / numPartitions, 
dfWithOffsetInfo.filter("_hoodie_kafka_source_partition=" + i).count());
     }
     assertEquals(0, dfWithOffsetInfo
-        .drop(KAFKA_SOURCE_OFFSET_COLUMN, KAFKA_SOURCE_PARTITION_COLUMN, 
KAFKA_SOURCE_TIMESTAMP_COLUMN)
+        .drop(KAFKA_SOURCE_OFFSET_COLUMN, KAFKA_SOURCE_PARTITION_COLUMN, 
KAFKA_SOURCE_TIMESTAMP_COLUMN, KAFKA_SOURCE_KEY_COLUMN)
         .except(dfNoOffsetInfo).count());
     List<String> withKafkaOffsetColumns = 
Arrays.stream(dfWithOffsetInfo.columns()).collect(Collectors.toList());
-    assertEquals(3, withKafkaOffsetColumns.size() - columns.size());
-    List<String> appendList = Arrays.asList(KAFKA_SOURCE_OFFSET_COLUMN, 
KAFKA_SOURCE_PARTITION_COLUMN, KAFKA_SOURCE_TIMESTAMP_COLUMN);
-    assertEquals(appendList, 
withKafkaOffsetColumns.subList(withKafkaOffsetColumns.size() - 3, 
withKafkaOffsetColumns.size()));
+    assertEquals(4, withKafkaOffsetColumns.size() - columns.size());
+    List<String> appendList = Arrays.asList(KAFKA_SOURCE_OFFSET_COLUMN, 
KAFKA_SOURCE_PARTITION_COLUMN, KAFKA_SOURCE_TIMESTAMP_COLUMN, 
KAFKA_SOURCE_KEY_COLUMN);
+    assertEquals(appendList, 
withKafkaOffsetColumns.subList(withKafkaOffsetColumns.size() - 4, 
withKafkaOffsetColumns.size()));
 
     dfNoOffsetInfo.unpersist();
     dfWithOffsetInfo.unpersist();

Reply via email to