yihua commented on code in PR #7370:
URL: https://github.com/apache/hudi/pull/7370#discussion_r1042778288
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -121,6 +120,7 @@ object HoodieSparkSqlWriter {
}
val tableType = HoodieTableType.valueOf(hoodieConfig.getString(TABLE_TYPE))
var operation =
WriteOperationType.fromValue(hoodieConfig.getString(OPERATION))
+ // TODO clean up
Review Comment:
nit: remove this?
##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala:
##########
@@ -72,47 +71,44 @@ case class CreateHoodieTableAsSelectCommand(
}
}
- // ReOrder the query which move the partition columns to the last of the
project list
- val reOrderedQuery = reOrderPartitionColumn(query,
table.partitionColumnNames)
// Remove some properties should not be used
- val newStorage = new CatalogStorageFormat(
- table.storage.locationUri,
- table.storage.inputFormat,
- table.storage.outputFormat,
- table.storage.serde,
- table.storage.compressed,
- table.storage.properties.--(needFilterProps))
- val newTable = table.copy(
- identifier = tableIdentWithDB,
- storage = newStorage,
- schema = reOrderedQuery.schema,
- properties = table.properties.--(needFilterProps)
+ val updatedStorageFormat = table.storage.copy(
+ properties = table.storage.properties -- needFilterProps)
+
+ val updatedTable = table.copy(
+ identifier = qualifiedTableIdentifier,
+ storage = updatedStorageFormat,
+ // TODO need to add meta-fields here
+ schema = query.schema,
+ properties = table.properties -- needFilterProps
)
- val hoodieCatalogTable = HoodieCatalogTable(sparkSession, newTable)
+ val hoodieCatalogTable = HoodieCatalogTable(sparkSession, updatedTable)
val tablePath = hoodieCatalogTable.tableLocation
val hadoopConf = sparkSession.sessionState.newHadoopConf()
- // Execute the insert query
try {
- // init hoodie table
+ // Init hoodie table
hoodieCatalogTable.initHoodieTable()
- val tblProperties = hoodieCatalogTable.catalogProperties
- val options = Map(
+ val tableProperties = hoodieCatalogTable.catalogProperties
+ // NOTE: Users might be specifying write-configuration (inadvertently)
as options or table properties
+ // in CTAS, therefore we need to make sure that these are
appropriately propagated to the
+ // write operation
Review Comment:
Can the write config be specified in a different way with `options` to avoid
mixing write configs with table configs?
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala:
##########
@@ -92,11 +69,44 @@ object HoodieDatasetBulkInsertHelper extends Logging {
val updatedSchema = StructType(metaFields ++ schema.fields)
- val updatedDF = if (populateMetaFields &&
config.shouldCombineBeforeInsert) {
- val dedupedRdd = dedupeRows(prependedRdd, updatedSchema,
config.getPreCombineField, SparkHoodieIndexFactory.isGlobalIndex(config))
+ val updatedDF = if (populateMetaFields) {
+ val keyGeneratorClassName =
config.getStringOrThrow(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME,
+ "Key-generator class name is required")
+
+ val prependedRdd: RDD[InternalRow] =
+ df.queryExecution.toRdd.mapPartitions { iter =>
+ val keyGenerator =
+ ReflectionUtils.loadClass(keyGeneratorClassName, new
TypedProperties(config.getProps))
+ .asInstanceOf[SparkKeyGeneratorInterface]
+
+ iter.map { row =>
+ val recordKey = keyGenerator.getRecordKey(row, schema)
+ val partitionPath = keyGenerator.getPartitionPath(row, schema)
+ val commitTimestamp = UTF8String.EMPTY_UTF8
+ val commitSeqNo = UTF8String.EMPTY_UTF8
+ val filename = UTF8String.EMPTY_UTF8
+
+ // TODO use mutable row, avoid re-allocating
Review Comment:
nit: Create a JIRA ticket for this?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]