yihua commented on code in PR #7370:
URL: https://github.com/apache/hudi/pull/7370#discussion_r1049000214


##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala:
##########
@@ -92,11 +69,44 @@ object HoodieDatasetBulkInsertHelper extends Logging {
 
     val updatedSchema = StructType(metaFields ++ schema.fields)
 
-    val updatedDF = if (populateMetaFields && 
config.shouldCombineBeforeInsert) {
-      val dedupedRdd = dedupeRows(prependedRdd, updatedSchema, 
config.getPreCombineField, SparkHoodieIndexFactory.isGlobalIndex(config))
+    val updatedDF = if (populateMetaFields) {
+      val keyGeneratorClassName = 
config.getStringOrThrow(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME,
+        "Key-generator class name is required")
+
+      val prependedRdd: RDD[InternalRow] =
+        df.queryExecution.toRdd.mapPartitions { iter =>
+          val keyGenerator =
+            ReflectionUtils.loadClass(keyGeneratorClassName, new 
TypedProperties(config.getProps))
+              .asInstanceOf[SparkKeyGeneratorInterface]
+
+          iter.map { row =>
+            val recordKey = keyGenerator.getRecordKey(row, schema)
+            val partitionPath = keyGenerator.getPartitionPath(row, schema)
+            val commitTimestamp = UTF8String.EMPTY_UTF8
+            val commitSeqNo = UTF8String.EMPTY_UTF8
+            val filename = UTF8String.EMPTY_UTF8
+
+            // TODO use mutable row, avoid re-allocating

Review Comment:
   Makes sense.  In general, we should only limit TODO to tiny stuff, as Jira 
tickets have better traceability and assignment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to