nsivabalan commented on code in PR #8107:
URL: https://github.com/apache/hudi/pull/8107#discussion_r1185065540
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -1118,44 +1124,70 @@ object HoodieSparkSqlWriter {
Some(writerSchema))
avroRecords.mapPartitions(it => {
+ val sparkPartitionId = TaskContext.getPartitionId()
+
val dataFileSchema = new Schema.Parser().parse(dataFileSchemaStr)
val consistentLogicalTimestampEnabled = parameters.getOrElse(
DataSourceWriteOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
DataSourceWriteOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()).toBoolean
- it.map { avroRecord =>
+ // generate record keys if auto generation is enabled.
+ val recordsWithRecordKeyOverride =
mayBeAutoGenerateRecordKeys(autoGenerateRecordKeys, it, instantTime,
sparkPartitionId)
+
+ // handle dropping partition columns
+ recordsWithRecordKeyOverride.map { avroRecordRecordKeyOverRide =>
val processedRecord = if (shouldDropPartitionColumns) {
- HoodieAvroUtils.rewriteRecord(avroRecord, dataFileSchema)
+ HoodieAvroUtils.rewriteRecord(avroRecordRecordKeyOverRide._1,
dataFileSchema)
+ } else {
+ avroRecordRecordKeyOverRide._1
+ }
+
+ // Generate HoodieKey for records
Review Comment:
nope. as I called out, keyGen class is meant to be called for each record
separately. But here we call it within mapPartitions call. and we generate it
based on rowId.
Without touching the apis for key gen class, nor the constructor of key gen
instantiation, it might be tough.
but lets chat to see if we can do it.
I would also love to conceal or keep it in one place if feasible.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]