boneanxs commented on code in PR #10254:
URL: https://github.com/apache/hudi/pull/10254#discussion_r1422307308


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala:
##########
@@ -334,42 +336,51 @@ trait ProvidesHoodieConfig extends Logging {
     }
   }
 
-  def deduceIsOverwriteTable(sparkSession: SparkSession,
-                             catalogTable: HoodieCatalogTable,
-                             partitionSpec: Map[String, Option[String]],
-                             extraOptions: Map[String, String]): Boolean = {
+  /**
+   * Deduce the overwrite config based on writeOperation and overwriteMode 
config.
+   * The returned staticOverwritePartitionPathOpt is defined only in static 
insert_overwrite case.
+   *
+   * @return (overwriteMode, isOverWriteTable, isOverWritePartition, 
staticOverwritePartitionPathOpt)
+   */
+  def deduceOverwriteConfig(sparkSession: SparkSession,
+                            catalogTable: HoodieCatalogTable,
+                            partitionSpec: Map[String, Option[String]],
+                            extraOptions: Map[String, String]): (SaveMode, 
Boolean, Boolean, Option[String]) = {
     val combinedOpts: Map[String, String] = combineOptions(catalogTable, 
catalogTable.tableConfig, sparkSession.sqlContext.conf,
       defaultOpts = Map.empty, overridingOpts = extraOptions)
     val operation = combinedOpts.getOrElse(OPERATION.key, null)
-    operation match {
-      case INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL =>
-        true
-      case INSERT_OVERWRITE_OPERATION_OPT_VAL =>
-        false
+    // If hoodie.datasource.overwrite.mode configured, respect it, otherwise 
respect spark.sql.sources.partitionOverwriteMode
+    val hoodieOverwriteMode = combinedOpts.getOrElse(OVERWRITE_MODE.key,
+      
sparkSession.sqlContext.getConf(PARTITION_OVERWRITE_MODE.key)).toUpperCase()
+    val isStaticOverwrite = hoodieOverwriteMode match {
+      case "STATIC" => true
+      case "DYNAMIC" => false
+      case _ => throw new IllegalArgumentException("Config 
hoodie.datasource.overwrite.mode is illegal")
+    }
+    val isOverWriteTable = operation match {
+      case INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL => true
+      case INSERT_OVERWRITE_OPERATION_OPT_VAL => false
       case _ =>
-        // NonPartitioned table always insert overwrite whole table
-        if (catalogTable.partitionFields.isEmpty) {
-          true
-        } else {
-          // Insert overwrite partitioned table with PARTITION clause will 
always insert overwrite the specific partition
-          if (partitionSpec.nonEmpty) {
-            false
-          } else {
-            // If hoodie.datasource.overwrite.mode configured, respect it, 
otherwise respect spark.sql.sources.partitionOverwriteMode
-            val hoodieOverwriteMode = 
combinedOpts.getOrElse(OVERWRITE_MODE.key,
-              
sparkSession.sqlContext.getConf(PARTITION_OVERWRITE_MODE.key)).toUpperCase()
-
-            hoodieOverwriteMode match {
-              case "STATIC" =>
-                true
-              case "DYNAMIC" =>
-                false
-              case _ =>
-                throw new IllegalArgumentException("Config 
hoodie.datasource.overwrite.mode is illegal")
-            }
-          }
-        }
+        // There are two cases where we need use insert_overwrite_table
+        // 1. NonPartitioned table always insert overwrite whole table
+        // 2. static mode and no partition values specified
+        catalogTable.partitionFields.isEmpty || (isStaticOverwrite && 
partitionSpec.isEmpty)
+    }
+    val overwriteMode = if (isOverWriteTable) SaveMode.Overwrite else 
SaveMode.Append
+    val staticPartitions = if (isStaticOverwrite && !isOverWriteTable) {
+      val fileIndex = HoodieFileIndex(sparkSession, catalogTable.metaClient, 
None, combinedOpts, FileStatusCache.getOrCreate(sparkSession))

Review Comment:
   cc @beyond1920, this pr tries to align with Spark that only under static 
overwrite mode, insert overwrite an empty dataset will overwrite the old data, 
this is a breaking change with https://github.com/apache/hudi/pull/9811. Is 
that suitable to you?



##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala:
##########
@@ -88,19 +88,11 @@ object InsertIntoHoodieTableCommand extends Logging with 
ProvidesHoodieConfig wi
           extraOptions: Map[String, String] = Map.empty): Boolean = {
     val catalogTable = new HoodieCatalogTable(sparkSession, table)
 
-    var mode = SaveMode.Append
-    var isOverWriteTable = false
-    var isOverWritePartition = false
-
-    if (overwrite) {
-      if (deduceIsOverwriteTable(sparkSession, catalogTable, partitionSpec, 
extraOptions)) {
-        isOverWriteTable = true
-        mode = SaveMode.Overwrite
-      } else {
-        isOverWritePartition = true
-      }
+    val (mode, isOverWriteTable, isOverWritePartition, 
staticOverwritePartitionPathOpt) = if (overwrite) {
+      deduceOverwriteConfig(sparkSession, catalogTable, partitionSpec, 
extraOptions)
+    } else {
+      (SaveMode.Append, false, false, Option.empty)
     }
-    val staticOverwritePartitionPathOpt = 
getStaticOverwritePartitionPath(catalogTable, partitionSpec, 
isOverWritePartition)

Review Comment:
   the old method can be deleted?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to