yihua commented on code in PR #7370:
URL: https://github.com/apache/hudi/pull/7370#discussion_r1042817361


##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala:
##########
@@ -72,47 +71,44 @@ case class CreateHoodieTableAsSelectCommand(
       }
     }
 
-    // ReOrder the query which move the partition columns to the last of the 
project list
-    val reOrderedQuery = reOrderPartitionColumn(query, 
table.partitionColumnNames)
     // Remove some properties should not be used
-    val newStorage = new CatalogStorageFormat(
-      table.storage.locationUri,
-      table.storage.inputFormat,
-      table.storage.outputFormat,
-      table.storage.serde,
-      table.storage.compressed,
-      table.storage.properties.--(needFilterProps))
-    val newTable = table.copy(
-      identifier = tableIdentWithDB,
-      storage = newStorage,
-      schema = reOrderedQuery.schema,
-      properties = table.properties.--(needFilterProps)
+    val updatedStorageFormat = table.storage.copy(
+      properties = table.storage.properties -- needFilterProps)
+
+    val updatedTable = table.copy(
+      identifier = qualifiedTableIdentifier,
+      storage = updatedStorageFormat,
+      // TODO need to add meta-fields here
+      schema = query.schema,
+      properties = table.properties -- needFilterProps
     )
 
-    val hoodieCatalogTable = HoodieCatalogTable(sparkSession, newTable)
+    val hoodieCatalogTable = HoodieCatalogTable(sparkSession, updatedTable)
     val tablePath = hoodieCatalogTable.tableLocation
     val hadoopConf = sparkSession.sessionState.newHadoopConf()
 
-    // Execute the insert query
     try {
-      // init hoodie table
+      // Init hoodie table
       hoodieCatalogTable.initHoodieTable()
 
-      val tblProperties = hoodieCatalogTable.catalogProperties
-      val options = Map(
+      val tableProperties = hoodieCatalogTable.catalogProperties
+      // NOTE: Users might be specifying write-configuration (inadvertently) 
as options or table properties
+      //       in CTAS, therefore we need to make sure that these are 
appropriately propagated to the
+      //       write operation

Review Comment:
   Makes sense to me now.



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala:
##########
@@ -92,11 +69,44 @@ object HoodieDatasetBulkInsertHelper extends Logging {
 
     val updatedSchema = StructType(metaFields ++ schema.fields)
 
-    val updatedDF = if (populateMetaFields && 
config.shouldCombineBeforeInsert) {
-      val dedupedRdd = dedupeRows(prependedRdd, updatedSchema, 
config.getPreCombineField, SparkHoodieIndexFactory.isGlobalIndex(config))
+    val updatedDF = if (populateMetaFields) {
+      val keyGeneratorClassName = 
config.getStringOrThrow(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME,
+        "Key-generator class name is required")
+
+      val prependedRdd: RDD[InternalRow] =
+        df.queryExecution.toRdd.mapPartitions { iter =>
+          val keyGenerator =
+            ReflectionUtils.loadClass(keyGeneratorClassName, new 
TypedProperties(config.getProps))
+              .asInstanceOf[SparkKeyGeneratorInterface]
+
+          iter.map { row =>
+            val recordKey = keyGenerator.getRecordKey(row, schema)
+            val partitionPath = keyGenerator.getPartitionPath(row, schema)
+            val commitTimestamp = UTF8String.EMPTY_UTF8
+            val commitSeqNo = UTF8String.EMPTY_UTF8
+            val filename = UTF8String.EMPTY_UTF8
+
+            // TODO use mutable row, avoid re-allocating

Review Comment:
   If it's for you, could you add `TODO(<name_handle>)` so that we know you 
plan to take it up later?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to