danny0405 commented on code in PR #9403:
URL: https://github.com/apache/hudi/pull/9403#discussion_r1289505376
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/KafkaOffsetPostProcessor.java:
##########
@@ -54,21 +56,23 @@ public static boolean shouldAddOffsets(TypedProperties
props) {
public static final String KAFKA_SOURCE_OFFSET_COLUMN =
"_hoodie_kafka_source_offset";
public static final String KAFKA_SOURCE_PARTITION_COLUMN =
"_hoodie_kafka_source_partition";
public static final String KAFKA_SOURCE_TIMESTAMP_COLUMN =
"_hoodie_kafka_source_timestamp";
+ public static final String KAFKA_SOURCE_KEY_COLUMN =
"_hoodie_kafka_source_key";
public KafkaOffsetPostProcessor(TypedProperties props, JavaSparkContext
jssc) {
super(props, jssc);
}
@Override
public Schema processSchema(Schema schema) {
- // this method adds kafka offset fields namely source offset, partition
and timestamp to the schema of the batch.
+ // this method adds kafka offset fields namely source offset, partition,
timestamp and kafka message key to the schema of the batch.
try {
List<Schema.Field> fieldList = schema.getFields();
List<Schema.Field> newFieldList = fieldList.stream()
.map(f -> new Schema.Field(f.name(), f.schema(), f.doc(),
f.defaultVal())).collect(Collectors.toList());
newFieldList.add(new Schema.Field(KAFKA_SOURCE_OFFSET_COLUMN,
Schema.create(Schema.Type.LONG), "offset column", 0));
newFieldList.add(new Schema.Field(KAFKA_SOURCE_PARTITION_COLUMN,
Schema.create(Schema.Type.INT), "partition column", 0));
newFieldList.add(new Schema.Field(KAFKA_SOURCE_TIMESTAMP_COLUMN,
Schema.create(Schema.Type.LONG), "timestamp column", 0));
+ newFieldList.add(new Schema.Field(KAFKA_SOURCE_KEY_COLUMN,
createNullableSchema(Schema.Type.STRING), "kafka key column",
JsonProperties.NULL_VALUE));
Schema newSchema = Schema.createRecord(schema.getName() + "_processed",
schema.getDoc(), schema.getNamespace(), false, newFieldList);
Review Comment:
The key is always a string type? Could it be bytes in Kafka ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]