codope commented on code in PR #7370:
URL: https://github.com/apache/hudi/pull/7370#discussion_r1042372321


##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieUnsafeUtils.scala:
##########
@@ -30,6 +30,10 @@ import org.apache.spark.util.MutablePair
  */
 object HoodieUnsafeUtils {
 
+  // TODO scala-doc

Review Comment:
   Let's add doc or remove the comment.



##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala:
##########
@@ -85,7 +83,8 @@ case class CreateHoodieTableAsSelectCommand(
     val newTable = table.copy(
       identifier = tableIdentWithDB,
       storage = newStorage,
-      schema = reOrderedQuery.schema,
+      // TODO add meta-fields

Review Comment:
   Will this be taken up in the PR stacked on top of this one?



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala:
##########
@@ -92,11 +69,44 @@ object HoodieDatasetBulkInsertHelper extends Logging {
 
     val updatedSchema = StructType(metaFields ++ schema.fields)
 
-    val updatedDF = if (populateMetaFields && 
config.shouldCombineBeforeInsert) {
-      val dedupedRdd = dedupeRows(prependedRdd, updatedSchema, 
config.getPreCombineField, SparkHoodieIndexFactory.isGlobalIndex(config))
+    val updatedDF = if (populateMetaFields) {
+      val keyGeneratorClassName = 
config.getStringOrThrow(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME,
+        "Key-generator class name is required")
+
+      val prependedRdd: RDD[InternalRow] =
+        df.queryExecution.toRdd.mapPartitions { iter =>
+          val keyGenerator =
+            ReflectionUtils.loadClass(keyGeneratorClassName, new 
TypedProperties(config.getProps))
+              .asInstanceOf[SparkKeyGeneratorInterface]
+
+          iter.map { row =>
+            val recordKey = keyGenerator.getRecordKey(row, schema)
+            val partitionPath = keyGenerator.getPartitionPath(row, schema)
+            val commitTimestamp = UTF8String.EMPTY_UTF8
+            val commitSeqNo = UTF8String.EMPTY_UTF8
+            val filename = UTF8String.EMPTY_UTF8
+
+            // TODO use mutable row, avoid re-allocating
+            new HoodieInternalRow(commitTimestamp, commitSeqNo, recordKey, 
partitionPath, filename, row, false)
+          }
+        }
+
+      val dedupedRdd = if (config.shouldCombineBeforeInsert) {
+        dedupeRows(prependedRdd, updatedSchema, config.getPreCombineField, 
SparkHoodieIndexFactory.isGlobalIndex(config))
+      } else {
+        prependedRdd
+      }
+
       HoodieUnsafeUtils.createDataFrameFromRDD(df.sparkSession, dedupedRdd, 
updatedSchema)
     } else {
-      HoodieUnsafeUtils.createDataFrameFromRDD(df.sparkSession, prependedRdd, 
updatedSchema)
+      // NOTE: In cases when we're not populating meta-fields we actually don't
+      //       need access to the [[InternalRow]] and therefore can avoid the 
need
+      //       to dereference [[DataFrame]] into [[RDD]]
+      val query = df.queryExecution.logical
+      val metaFieldsStubs = metaFields.map(f => 
Alias(Literal(UTF8String.EMPTY_UTF8, dataType = StringType), f.name)())
+      val prependedQuery = Project(metaFieldsStubs ++ query.output, query)
+
+      HoodieUnsafeUtils.createDataFrameFrom(df.sparkSession, prependedQuery)

Review Comment:
   +1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to