beyond1920 commented on code in PR #10254:
URL: https://github.com/apache/hudi/pull/10254#discussion_r1429169068
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala:
##########
@@ -334,42 +336,51 @@ trait ProvidesHoodieConfig extends Logging {
}
}
- def deduceIsOverwriteTable(sparkSession: SparkSession,
- catalogTable: HoodieCatalogTable,
- partitionSpec: Map[String, Option[String]],
- extraOptions: Map[String, String]): Boolean = {
+ /**
+ * Deduce the overwrite config based on writeOperation and overwriteMode
config.
+ * The returned staticOverwritePartitionPathOpt is defined only in static
insert_overwrite case.
+ *
+ * @return (overwriteMode, isOverWriteTable, isOverWritePartition,
staticOverwritePartitionPathOpt)
+ */
+ def deduceOverwriteConfig(sparkSession: SparkSession,
+ catalogTable: HoodieCatalogTable,
+ partitionSpec: Map[String, Option[String]],
+ extraOptions: Map[String, String]): (SaveMode,
Boolean, Boolean, Option[String]) = {
val combinedOpts: Map[String, String] = combineOptions(catalogTable,
catalogTable.tableConfig, sparkSession.sqlContext.conf,
defaultOpts = Map.empty, overridingOpts = extraOptions)
val operation = combinedOpts.getOrElse(OPERATION.key, null)
- operation match {
- case INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL =>
- true
- case INSERT_OVERWRITE_OPERATION_OPT_VAL =>
- false
+ // If hoodie.datasource.overwrite.mode configured, respect it, otherwise
respect spark.sql.sources.partitionOverwriteMode
+ val hoodieOverwriteMode = combinedOpts.getOrElse(OVERWRITE_MODE.key,
+
sparkSession.sqlContext.getConf(PARTITION_OVERWRITE_MODE.key)).toUpperCase()
+ val isStaticOverwrite = hoodieOverwriteMode match {
+ case "STATIC" => true
+ case "DYNAMIC" => false
+ case _ => throw new IllegalArgumentException("Config
hoodie.datasource.overwrite.mode is illegal")
+ }
+ val isOverWriteTable = operation match {
+ case INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL => true
+ case INSERT_OVERWRITE_OPERATION_OPT_VAL => false
case _ =>
- // NonPartitioned table always insert overwrite whole table
- if (catalogTable.partitionFields.isEmpty) {
- true
- } else {
- // Insert overwrite partitioned table with PARTITION clause will
always insert overwrite the specific partition
- if (partitionSpec.nonEmpty) {
- false
- } else {
- // If hoodie.datasource.overwrite.mode configured, respect it,
otherwise respect spark.sql.sources.partitionOverwriteMode
- val hoodieOverwriteMode =
combinedOpts.getOrElse(OVERWRITE_MODE.key,
-
sparkSession.sqlContext.getConf(PARTITION_OVERWRITE_MODE.key)).toUpperCase()
-
- hoodieOverwriteMode match {
- case "STATIC" =>
- true
- case "DYNAMIC" =>
- false
- case _ =>
- throw new IllegalArgumentException("Config
hoodie.datasource.overwrite.mode is illegal")
- }
- }
- }
+ // There are two cases where we need use insert_overwrite_table
+ // 1. NonPartitioned table always insert overwrite whole table
+ // 2. static mode and no partition values specified
+ catalogTable.partitionFields.isEmpty || (isStaticOverwrite &&
partitionSpec.isEmpty)
+ }
+ val overwriteMode = if (isOverWriteTable) SaveMode.Overwrite else
SaveMode.Append
+ val staticPartitions = if (isStaticOverwrite && !isOverWriteTable) {
+ val fileIndex = HoodieFileIndex(sparkSession, catalogTable.metaClient,
None, combinedOpts, FileStatusCache.getOrCreate(sparkSession))
Review Comment:
Sorry for late response. @boneanxs I think the behavior is reasonable.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]