danny0405 commented on code in PR #9403:
URL: https://github.com/apache/hudi/pull/9403#discussion_r1289505376


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/KafkaOffsetPostProcessor.java:
##########
@@ -54,21 +56,23 @@ public static boolean shouldAddOffsets(TypedProperties 
props) {
   public static final String KAFKA_SOURCE_OFFSET_COLUMN = 
"_hoodie_kafka_source_offset";
   public static final String KAFKA_SOURCE_PARTITION_COLUMN = 
"_hoodie_kafka_source_partition";
   public static final String KAFKA_SOURCE_TIMESTAMP_COLUMN = 
"_hoodie_kafka_source_timestamp";
+  public static final String KAFKA_SOURCE_KEY_COLUMN = 
"_hoodie_kafka_source_key";
 
   public KafkaOffsetPostProcessor(TypedProperties props, JavaSparkContext 
jssc) {
     super(props, jssc);
   }
 
   @Override
   public Schema processSchema(Schema schema) {
-    // this method adds kafka offset fields namely source offset, partition 
and timestamp to the schema of the batch.
+    // this method adds kafka offset fields namely source offset, partition, 
timestamp and kafka message key to the schema of the batch.
     try {
       List<Schema.Field> fieldList = schema.getFields();
       List<Schema.Field> newFieldList = fieldList.stream()
           .map(f -> new Schema.Field(f.name(), f.schema(), f.doc(), 
f.defaultVal())).collect(Collectors.toList());
       newFieldList.add(new Schema.Field(KAFKA_SOURCE_OFFSET_COLUMN, 
Schema.create(Schema.Type.LONG), "offset column", 0));
       newFieldList.add(new Schema.Field(KAFKA_SOURCE_PARTITION_COLUMN, 
Schema.create(Schema.Type.INT), "partition column", 0));
       newFieldList.add(new Schema.Field(KAFKA_SOURCE_TIMESTAMP_COLUMN, 
Schema.create(Schema.Type.LONG), "timestamp column", 0));
+      newFieldList.add(new Schema.Field(KAFKA_SOURCE_KEY_COLUMN, 
createNullableSchema(Schema.Type.STRING), "kafka key column", 
JsonProperties.NULL_VALUE));
       Schema newSchema = Schema.createRecord(schema.getName() + "_processed", 
schema.getDoc(), schema.getNamespace(), false, newFieldList);

Review Comment:
   The key is always a string type? Could it be bytes in Kafka ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to