prathit06 commented on code in PR #9403:
URL: https://github.com/apache/hudi/pull/9403#discussion_r1293013600


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/KafkaOffsetPostProcessor.java:
##########
@@ -54,21 +56,23 @@ public static boolean shouldAddOffsets(TypedProperties 
props) {
   public static final String KAFKA_SOURCE_OFFSET_COLUMN = 
"_hoodie_kafka_source_offset";
   public static final String KAFKA_SOURCE_PARTITION_COLUMN = 
"_hoodie_kafka_source_partition";
   public static final String KAFKA_SOURCE_TIMESTAMP_COLUMN = 
"_hoodie_kafka_source_timestamp";
+  public static final String KAFKA_SOURCE_KEY_COLUMN = 
"_hoodie_kafka_source_key";
 
   public KafkaOffsetPostProcessor(TypedProperties props, JavaSparkContext 
jssc) {
     super(props, jssc);
   }
 
   @Override
   public Schema processSchema(Schema schema) {
-    // this method adds kafka offset fields namely source offset, partition 
and timestamp to the schema of the batch.
+    // this method adds kafka offset fields namely source offset, partition, 
timestamp and kafka message key to the schema of the batch.
     try {
       List<Schema.Field> fieldList = schema.getFields();
       List<Schema.Field> newFieldList = fieldList.stream()
           .map(f -> new Schema.Field(f.name(), f.schema(), f.doc(), 
f.defaultVal())).collect(Collectors.toList());
       newFieldList.add(new Schema.Field(KAFKA_SOURCE_OFFSET_COLUMN, 
Schema.create(Schema.Type.LONG), "offset column", 0));
       newFieldList.add(new Schema.Field(KAFKA_SOURCE_PARTITION_COLUMN, 
Schema.create(Schema.Type.INT), "partition column", 0));
       newFieldList.add(new Schema.Field(KAFKA_SOURCE_TIMESTAMP_COLUMN, 
Schema.create(Schema.Type.LONG), "timestamp column", 0));
+      newFieldList.add(new Schema.Field(KAFKA_SOURCE_KEY_COLUMN, 
createNullableSchema(Schema.Type.STRING), "kafka key column", 
JsonProperties.NULL_VALUE));
       Schema newSchema = Schema.createRecord(schema.getName() + "_processed", 
schema.getDoc(), schema.getNamespace(), false, newFieldList);

Review Comment:
   Referring to the discussion 
[here](https://github.com/apache/hudi/issues/9391#issuecomment-1670514673) , 
the idea was to add `kafka key` as part of hudi metadata column & not as a 
recordKey.
   
   In order to set kafka key as record key, end user can do so by setting 
`hoodie.datasource.write.recordkey.field` to `_hoodie_kafka_source_key` , 
please refer 
[here](https://github.com/apache/hudi/issues/9391#issuecomment-1669955523) for 
more context.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to