This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 66879cd1ac7 [HUDI-6828] Fix wrong partitionToReplaceIds when 
insertOverwrite empty data into partitions (#9811)
66879cd1ac7 is described below

commit 66879cd1ac78d702a06fc9cfeb1c4e0e5c47a4db
Author: Jing Zhang <[email protected]>
AuthorDate: Mon Oct 9 10:57:56 2023 +0800

    [HUDI-6828] Fix wrong partitionToReplaceIds when insertOverwrite empty data 
into partitions (#9811)
---
 .../apache/hudi/config/HoodieInternalConfig.java   |  6 ++
 .../SparkInsertOverwriteCommitActionExecutor.java  | 16 ++++-
 ...setBulkInsertOverwriteCommitActionExecutor.java | 15 ++++-
 .../spark/sql/hudi/ProvidesHoodieConfig.scala      | 12 +++-
 .../command/InsertIntoHoodieTableCommand.scala     | 20 +++++-
 .../apache/spark/sql/hudi/TestInsertTable.scala    | 72 ++++++++++++----------
 6 files changed, 99 insertions(+), 42 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieInternalConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieInternalConfig.java
index 797df196441..c34d8e45836 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieInternalConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieInternalConfig.java
@@ -46,6 +46,12 @@ public class HoodieInternalConfig extends HoodieConfig {
       .withDocumentation("For SQL operations, if enables bulk_insert 
operation, "
           + "this configure will take effect to decide overwrite whole table 
or partitions specified");
 
+  public static final ConfigProperty<String> STATIC_OVERWRITE_PARTITION_PATHS 
= ConfigProperty
+      .key("hoodie.static.overwrite.partition.paths")
+      .defaultValue("")
+      .markAdvanced()
+      .withDocumentation("Inner configure to pass static partition paths to 
executors for SQL operations.");
+
   /**
    * Returns if partition records are sorted or not.
    *
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java
index b265b32da8e..d12efab229d 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java
@@ -25,7 +25,9 @@ import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieInternalConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.data.HoodieJavaPairRDD;
 import org.apache.hudi.table.HoodieTable;
@@ -34,6 +36,7 @@ import org.apache.hudi.table.action.HoodieWriteMetadata;
 
 import org.apache.spark.Partitioner;
 
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -78,8 +81,17 @@ public class SparkInsertOverwriteCommitActionExecutor<T>
 
   @Override
   protected Map<String, List<String>> 
getPartitionToReplacedFileIds(HoodieWriteMetadata<HoodieData<WriteStatus>> 
writeMetadata) {
-    return 
HoodieJavaPairRDD.getJavaPairRDD(writeMetadata.getWriteStatuses().map(status -> 
status.getStat().getPartitionPath()).distinct().mapToPair(partitionPath ->
-        Pair.of(partitionPath, 
getAllExistingFileIds(partitionPath)))).collectAsMap();
+    if (writeMetadata.getWriteStatuses().isEmpty()) {
+      String staticOverwritePartition = 
config.getStringOrDefault(HoodieInternalConfig.STATIC_OVERWRITE_PARTITION_PATHS);
+      if (StringUtils.isNullOrEmpty(staticOverwritePartition)) {
+        return Collections.emptyMap();
+      } else {
+        return Collections.singletonMap(staticOverwritePartition, 
getAllExistingFileIds(staticOverwritePartition));
+      }
+    } else {
+      return 
HoodieJavaPairRDD.getJavaPairRDD(writeMetadata.getWriteStatuses().map(status -> 
status.getStat().getPartitionPath()).distinct().mapToPair(partitionPath ->
+          Pair.of(partitionPath, 
getAllExistingFileIds(partitionPath)))).collectAsMap();
+    }
   }
 
   protected List<String> getAllExistingFileIds(String partitionPath) {
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertOverwriteCommitActionExecutor.java
 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertOverwriteCommitActionExecutor.java
index a9f14d1e3e4..c1fd952b106 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertOverwriteCommitActionExecutor.java
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertOverwriteCommitActionExecutor.java
@@ -27,11 +27,13 @@ import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieInternalConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.data.HoodieJavaPairRDD;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -58,8 +60,17 @@ public class DatasetBulkInsertOverwriteCommitActionExecutor 
extends BaseDatasetB
 
   @Override
   protected Map<String, List<String>> 
getPartitionToReplacedFileIds(HoodieData<WriteStatus> writeStatuses) {
-    return HoodieJavaPairRDD.getJavaPairRDD(writeStatuses.map(status -> 
status.getStat().getPartitionPath()).distinct().mapToPair(partitionPath ->
-        Pair.of(partitionPath, 
getAllExistingFileIds(partitionPath)))).collectAsMap();
+    if (writeStatuses.isEmpty()) {
+      String staticOverwritePartition = 
writeConfig.getStringOrDefault(HoodieInternalConfig.STATIC_OVERWRITE_PARTITION_PATHS);
+      if (staticOverwritePartition == null || 
staticOverwritePartition.isEmpty()) {
+        return Collections.emptyMap();
+      } else {
+        return Collections.singletonMap(staticOverwritePartition, 
getAllExistingFileIds(staticOverwritePartition));
+      }
+    } else {
+      return HoodieJavaPairRDD.getJavaPairRDD(writeStatuses.map(status -> 
status.getStat().getPartitionPath()).distinct().mapToPair(partitionPath ->
+          Pair.of(partitionPath, 
getAllExistingFileIds(partitionPath)))).collectAsMap();
+    }
   }
 
   protected List<String> getAllExistingFileIds(String partitionPath) {
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
index 4eb8d2b1d1e..a34a6dfb052 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
@@ -164,7 +164,8 @@ trait ProvidesHoodieConfig extends Logging {
                               isOverwritePartition: Boolean,
                               isOverwriteTable: Boolean,
                               insertPartitions: Map[String, Option[String]] = 
Map.empty,
-                              extraOptions: Map[String, String]): Map[String, 
String] = {
+                              extraOptions: Map[String, String],
+                              staticOverwritePartitionPathOpt: Option[String] 
= Option.empty): Map[String, String] = {
 
     if (insertPartitions.nonEmpty &&
       (insertPartitions.keys.toSet != 
hoodieCatalogTable.partitionFields.toSet)) {
@@ -256,6 +257,13 @@ trait ProvidesHoodieConfig extends Logging {
       Map()
     }
 
+    val staticOverwritePartitionPathOptions = staticOverwritePartitionPathOpt 
match {
+      case Some(staticOverwritePartitionPath) =>
+        Map(HoodieInternalConfig.STATIC_OVERWRITE_PARTITION_PATHS.key() -> 
staticOverwritePartitionPath)
+      case _ =>
+        Map()
+    }
+
     // try to use new insert dup policy instead of legacy insert mode to 
deduce payload class. If only insert mode is explicitly specified,
     // w/o specifying any value for insert dup policy, legacy configs will be 
honored. But on all other cases (i.e when neither of the configs is set,
     // or when both configs are set, or when only insert dup policy is set), 
we honor insert dup policy and ignore the insert mode.
@@ -304,7 +312,7 @@ trait ProvidesHoodieConfig extends Logging {
       RECORDKEY_FIELD.key -> recordKeyConfigValue,
       PRECOMBINE_FIELD.key -> preCombineField,
       PARTITIONPATH_FIELD.key -> partitionFieldsStr
-    ) ++ overwriteTableOpts ++ getDropDupsConfig(useLegacyInsertModeFlow, 
combinedOpts)
+    ) ++ overwriteTableOpts ++ getDropDupsConfig(useLegacyInsertModeFlow, 
combinedOpts) ++ staticOverwritePartitionPathOptions
 
     combineOptions(hoodieCatalogTable, tableConfig, 
sparkSession.sqlContext.conf,
       defaultOpts = defaultOpts, overridingOpts = overridingOpts)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
index 29f27aa0bec..b8d5be7638f 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
@@ -100,8 +100,8 @@ object InsertIntoHoodieTableCommand extends Logging with 
ProvidesHoodieConfig wi
         isOverWritePartition = true
       }
     }
-
-    val config = buildHoodieInsertConfig(catalogTable, sparkSession, 
isOverWritePartition, isOverWriteTable, partitionSpec, extraOptions)
+    val staticOverwritePartitionPathOpt = 
getStaticOverwritePartitionPath(catalogTable, partitionSpec, 
isOverWritePartition)
+    val config = buildHoodieInsertConfig(catalogTable, sparkSession, 
isOverWritePartition, isOverWriteTable, partitionSpec, extraOptions, 
staticOverwritePartitionPathOpt)
 
     val alignedQuery = alignQueryOutput(query, catalogTable, partitionSpec, 
sparkSession.sessionState.conf)
 
@@ -118,6 +118,22 @@ object InsertIntoHoodieTableCommand extends Logging with 
ProvidesHoodieConfig wi
     success
   }
 
+  private def getStaticOverwritePartitionPath(hoodieCatalogTable: 
HoodieCatalogTable,
+                                              partitionsSpec: Map[String, 
Option[String]],
+                                              isOverWritePartition: Boolean): 
Option[String] = {
+    if (isOverWritePartition) {
+      val staticPartitionValues = filterStaticPartitionValues(partitionsSpec)
+      val isStaticOverwritePartition = staticPartitionValues.keys.size == 
hoodieCatalogTable.partitionFields.length
+      if (isStaticOverwritePartition) {
+        Option.apply(makePartitionPath(hoodieCatalogTable, 
staticPartitionValues))
+      } else {
+        Option.empty
+      }
+    } else {
+      Option.empty
+    }
+  }
+
   /**
    * Align provided [[query]]'s output with the expected [[catalogTable]] 
schema by
    *
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
index a057efdd078..1a925827088 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
@@ -880,30 +880,19 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
   }
 
   test("Test insert overwrite partitions with empty dataset") {
-    withSQLConf(SPARK_SQL_INSERT_INTO_OPERATION.key -> 
WriteOperationType.BULK_INSERT.value()) {
-      withRecordType()(withTempDir { tmp =>
-        Seq("cow", "mor").foreach { tableType =>
-          withTable(generateTableName) { inputTable =>
-            spark.sql(
-              s"""
-                 |create table $inputTable (
-                 |  id int,
-                 |  name string,
-                 |  price double,
-                 |  dt string
-                 |) using hudi
-                 | tblproperties (
-                 |  type = '$tableType',
-                 |  primaryKey = 'id'
-                 | )
-                 | partitioned by (dt)
-                 | location '${tmp.getCanonicalPath}/$inputTable'
-              """.stripMargin)
-
-            withTable(generateTableName) { target =>
+    Seq(true, false).foreach { enableBulkInsert =>
+      val bulkInsertConf: Array[(String, String)] = if (enableBulkInsert) {
+        Array(SPARK_SQL_INSERT_INTO_OPERATION.key -> 
WriteOperationType.BULK_INSERT.value())
+      } else {
+        Array()
+      }
+      withSQLConf(bulkInsertConf: _*) {
+        withRecordType()(withTempDir { tmp =>
+          Seq("cow", "mor").foreach { tableType =>
+            withTable(generateTableName) { inputTable =>
               spark.sql(
                 s"""
-                   |create table $target (
+                   |create table $inputTable (
                    |  id int,
                    |  name string,
                    |  price double,
@@ -914,21 +903,36 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
                    |  primaryKey = 'id'
                    | )
                    | partitioned by (dt)
-                   | location '${tmp.getCanonicalPath}/$target'
+                   | location '${tmp.getCanonicalPath}/$inputTable'
               """.stripMargin)
-              spark.sql(s"insert into $target values(3, 'c1', 13, 
'2021-07-17')")
-              spark.sql(s"insert into $target values(1, 'a1', 10, 
'2021-07-18')")
-
-              // Insert overwrite a partition with empty record
-              spark.sql(s"insert overwrite table $target 
partition(dt='2021-07-17') select id, name, price from $inputTable")
-              // TODO enable result check after fix 
https://issues.apache.org/jira/browse/HUDI-6828
-              //  checkAnswer(s"select id, name, price, dt from $target order 
by id")(
-              //    Seq(1, "a1", 10.0, "2021-07-18")
-              //  )
+
+              withTable(generateTableName) { target =>
+                spark.sql(
+                  s"""
+                     |create table $target (
+                     |  id int,
+                     |  name string,
+                     |  price double,
+                     |  dt string
+                     |) using hudi
+                     | tblproperties (
+                     |  type = '$tableType',
+                     |  primaryKey = 'id'
+                     | )
+                     | partitioned by (dt)
+                     | location '${tmp.getCanonicalPath}/$target'
+              """.stripMargin)
+                spark.sql(s"insert into $target values(3, 'c1', 13, 
'2021-07-17')")
+                spark.sql(s"insert into $target values(1, 'a1', 10, 
'2021-07-18')")
+
+                // Insert overwrite a partition with empty record
+                spark.sql(s"insert overwrite table $target 
partition(dt='2021-07-17') select id, name, price from $inputTable")
+                checkAnswer(s"select id, name, price, dt from $target where 
dt='2021-07-17'")(Seq.empty: _*)
+              }
             }
           }
-        }
-      })
+        })
+      }
     }
   }
 

Reply via email to