yihua commented on code in PR #7370:
URL: https://github.com/apache/hudi/pull/7370#discussion_r1042778288


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -121,6 +120,7 @@ object HoodieSparkSqlWriter {
     }
     val tableType = HoodieTableType.valueOf(hoodieConfig.getString(TABLE_TYPE))
     var operation = 
WriteOperationType.fromValue(hoodieConfig.getString(OPERATION))
+    // TODO clean up

Review Comment:
   nit: remove this?



##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala:
##########
@@ -72,47 +71,44 @@ case class CreateHoodieTableAsSelectCommand(
       }
     }
 
-    // ReOrder the query which move the partition columns to the last of the 
project list
-    val reOrderedQuery = reOrderPartitionColumn(query, 
table.partitionColumnNames)
     // Remove some properties should not be used
-    val newStorage = new CatalogStorageFormat(
-      table.storage.locationUri,
-      table.storage.inputFormat,
-      table.storage.outputFormat,
-      table.storage.serde,
-      table.storage.compressed,
-      table.storage.properties.--(needFilterProps))
-    val newTable = table.copy(
-      identifier = tableIdentWithDB,
-      storage = newStorage,
-      schema = reOrderedQuery.schema,
-      properties = table.properties.--(needFilterProps)
+    val updatedStorageFormat = table.storage.copy(
+      properties = table.storage.properties -- needFilterProps)
+
+    val updatedTable = table.copy(
+      identifier = qualifiedTableIdentifier,
+      storage = updatedStorageFormat,
+      // TODO need to add meta-fields here
+      schema = query.schema,
+      properties = table.properties -- needFilterProps
     )
 
-    val hoodieCatalogTable = HoodieCatalogTable(sparkSession, newTable)
+    val hoodieCatalogTable = HoodieCatalogTable(sparkSession, updatedTable)
     val tablePath = hoodieCatalogTable.tableLocation
     val hadoopConf = sparkSession.sessionState.newHadoopConf()
 
-    // Execute the insert query
     try {
-      // init hoodie table
+      // Init hoodie table
       hoodieCatalogTable.initHoodieTable()
 
-      val tblProperties = hoodieCatalogTable.catalogProperties
-      val options = Map(
+      val tableProperties = hoodieCatalogTable.catalogProperties
+      // NOTE: Users might be specifying write-configuration (inadvertently) 
as options or table properties
+      //       in CTAS, therefore we need to make sure that these are 
appropriately propagated to the
+      //       write operation

Review Comment:
   Can the write config be specified in a different way with `options` to avoid 
mixing write configs with table configs?



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala:
##########
@@ -92,11 +69,44 @@ object HoodieDatasetBulkInsertHelper extends Logging {
 
     val updatedSchema = StructType(metaFields ++ schema.fields)
 
-    val updatedDF = if (populateMetaFields && 
config.shouldCombineBeforeInsert) {
-      val dedupedRdd = dedupeRows(prependedRdd, updatedSchema, 
config.getPreCombineField, SparkHoodieIndexFactory.isGlobalIndex(config))
+    val updatedDF = if (populateMetaFields) {
+      val keyGeneratorClassName = 
config.getStringOrThrow(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME,
+        "Key-generator class name is required")
+
+      val prependedRdd: RDD[InternalRow] =
+        df.queryExecution.toRdd.mapPartitions { iter =>
+          val keyGenerator =
+            ReflectionUtils.loadClass(keyGeneratorClassName, new 
TypedProperties(config.getProps))
+              .asInstanceOf[SparkKeyGeneratorInterface]
+
+          iter.map { row =>
+            val recordKey = keyGenerator.getRecordKey(row, schema)
+            val partitionPath = keyGenerator.getPartitionPath(row, schema)
+            val commitTimestamp = UTF8String.EMPTY_UTF8
+            val commitSeqNo = UTF8String.EMPTY_UTF8
+            val filename = UTF8String.EMPTY_UTF8
+
+            // TODO use mutable row, avoid re-allocating

Review Comment:
   nit: Create a JIRA ticket for this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to