This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 66879cd1ac7 [HUDI-6828] Fix wrong partitionToReplaceIds when
insertOverwrite empty data into partitions (#9811)
66879cd1ac7 is described below
commit 66879cd1ac78d702a06fc9cfeb1c4e0e5c47a4db
Author: Jing Zhang <[email protected]>
AuthorDate: Mon Oct 9 10:57:56 2023 +0800
[HUDI-6828] Fix wrong partitionToReplaceIds when insertOverwrite empty data
into partitions (#9811)
---
.../apache/hudi/config/HoodieInternalConfig.java | 6 ++
.../SparkInsertOverwriteCommitActionExecutor.java | 16 ++++-
...setBulkInsertOverwriteCommitActionExecutor.java | 15 ++++-
.../spark/sql/hudi/ProvidesHoodieConfig.scala | 12 +++-
.../command/InsertIntoHoodieTableCommand.scala | 20 +++++-
.../apache/spark/sql/hudi/TestInsertTable.scala | 72 ++++++++++++----------
6 files changed, 99 insertions(+), 42 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieInternalConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieInternalConfig.java
index 797df196441..c34d8e45836 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieInternalConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieInternalConfig.java
@@ -46,6 +46,12 @@ public class HoodieInternalConfig extends HoodieConfig {
.withDocumentation("For SQL operations, if enables bulk_insert
operation, "
+ "this configure will take effect to decide overwrite whole table
or partitions specified");
+ public static final ConfigProperty<String> STATIC_OVERWRITE_PARTITION_PATHS
= ConfigProperty
+ .key("hoodie.static.overwrite.partition.paths")
+ .defaultValue("")
+ .markAdvanced()
+ .withDocumentation("Inner configure to pass static partition paths to
executors for SQL operations.");
+
/**
* Returns if partition records are sorted or not.
*
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java
index b265b32da8e..d12efab229d 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java
@@ -25,7 +25,9 @@ import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieInternalConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaPairRDD;
import org.apache.hudi.table.HoodieTable;
@@ -34,6 +36,7 @@ import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.spark.Partitioner;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -78,8 +81,17 @@ public class SparkInsertOverwriteCommitActionExecutor<T>
@Override
protected Map<String, List<String>>
getPartitionToReplacedFileIds(HoodieWriteMetadata<HoodieData<WriteStatus>>
writeMetadata) {
- return
HoodieJavaPairRDD.getJavaPairRDD(writeMetadata.getWriteStatuses().map(status ->
status.getStat().getPartitionPath()).distinct().mapToPair(partitionPath ->
- Pair.of(partitionPath,
getAllExistingFileIds(partitionPath)))).collectAsMap();
+ if (writeMetadata.getWriteStatuses().isEmpty()) {
+ String staticOverwritePartition =
config.getStringOrDefault(HoodieInternalConfig.STATIC_OVERWRITE_PARTITION_PATHS);
+ if (StringUtils.isNullOrEmpty(staticOverwritePartition)) {
+ return Collections.emptyMap();
+ } else {
+ return Collections.singletonMap(staticOverwritePartition,
getAllExistingFileIds(staticOverwritePartition));
+ }
+ } else {
+ return
HoodieJavaPairRDD.getJavaPairRDD(writeMetadata.getWriteStatuses().map(status ->
status.getStat().getPartitionPath()).distinct().mapToPair(partitionPath ->
+ Pair.of(partitionPath,
getAllExistingFileIds(partitionPath)))).collectAsMap();
+ }
}
protected List<String> getAllExistingFileIds(String partitionPath) {
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertOverwriteCommitActionExecutor.java
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertOverwriteCommitActionExecutor.java
index a9f14d1e3e4..c1fd952b106 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertOverwriteCommitActionExecutor.java
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertOverwriteCommitActionExecutor.java
@@ -27,11 +27,13 @@ import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieInternalConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaPairRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -58,8 +60,17 @@ public class DatasetBulkInsertOverwriteCommitActionExecutor
extends BaseDatasetB
@Override
protected Map<String, List<String>>
getPartitionToReplacedFileIds(HoodieData<WriteStatus> writeStatuses) {
- return HoodieJavaPairRDD.getJavaPairRDD(writeStatuses.map(status ->
status.getStat().getPartitionPath()).distinct().mapToPair(partitionPath ->
- Pair.of(partitionPath,
getAllExistingFileIds(partitionPath)))).collectAsMap();
+ if (writeStatuses.isEmpty()) {
+ String staticOverwritePartition =
writeConfig.getStringOrDefault(HoodieInternalConfig.STATIC_OVERWRITE_PARTITION_PATHS);
+ if (staticOverwritePartition == null ||
staticOverwritePartition.isEmpty()) {
+ return Collections.emptyMap();
+ } else {
+ return Collections.singletonMap(staticOverwritePartition,
getAllExistingFileIds(staticOverwritePartition));
+ }
+ } else {
+ return HoodieJavaPairRDD.getJavaPairRDD(writeStatuses.map(status ->
status.getStat().getPartitionPath()).distinct().mapToPair(partitionPath ->
+ Pair.of(partitionPath,
getAllExistingFileIds(partitionPath)))).collectAsMap();
+ }
}
protected List<String> getAllExistingFileIds(String partitionPath) {
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
index 4eb8d2b1d1e..a34a6dfb052 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
@@ -164,7 +164,8 @@ trait ProvidesHoodieConfig extends Logging {
isOverwritePartition: Boolean,
isOverwriteTable: Boolean,
insertPartitions: Map[String, Option[String]] =
Map.empty,
- extraOptions: Map[String, String]): Map[String,
String] = {
+ extraOptions: Map[String, String],
+ staticOverwritePartitionPathOpt: Option[String]
= Option.empty): Map[String, String] = {
if (insertPartitions.nonEmpty &&
(insertPartitions.keys.toSet !=
hoodieCatalogTable.partitionFields.toSet)) {
@@ -256,6 +257,13 @@ trait ProvidesHoodieConfig extends Logging {
Map()
}
+ val staticOverwritePartitionPathOptions = staticOverwritePartitionPathOpt
match {
+ case Some(staticOverwritePartitionPath) =>
+ Map(HoodieInternalConfig.STATIC_OVERWRITE_PARTITION_PATHS.key() ->
staticOverwritePartitionPath)
+ case _ =>
+ Map()
+ }
+
// try to use new insert dup policy instead of legacy insert mode to
deduce payload class. If only insert mode is explicitly specified,
// w/o specifying any value for insert dup policy, legacy configs will be
honored. But on all other cases (i.e when neither of the configs is set,
// or when both configs are set, or when only insert dup policy is set),
we honor insert dup policy and ignore the insert mode.
@@ -304,7 +312,7 @@ trait ProvidesHoodieConfig extends Logging {
RECORDKEY_FIELD.key -> recordKeyConfigValue,
PRECOMBINE_FIELD.key -> preCombineField,
PARTITIONPATH_FIELD.key -> partitionFieldsStr
- ) ++ overwriteTableOpts ++ getDropDupsConfig(useLegacyInsertModeFlow,
combinedOpts)
+ ) ++ overwriteTableOpts ++ getDropDupsConfig(useLegacyInsertModeFlow,
combinedOpts) ++ staticOverwritePartitionPathOptions
combineOptions(hoodieCatalogTable, tableConfig,
sparkSession.sqlContext.conf,
defaultOpts = defaultOpts, overridingOpts = overridingOpts)
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
index 29f27aa0bec..b8d5be7638f 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
@@ -100,8 +100,8 @@ object InsertIntoHoodieTableCommand extends Logging with
ProvidesHoodieConfig wi
isOverWritePartition = true
}
}
-
- val config = buildHoodieInsertConfig(catalogTable, sparkSession,
isOverWritePartition, isOverWriteTable, partitionSpec, extraOptions)
+ val staticOverwritePartitionPathOpt =
getStaticOverwritePartitionPath(catalogTable, partitionSpec,
isOverWritePartition)
+ val config = buildHoodieInsertConfig(catalogTable, sparkSession,
isOverWritePartition, isOverWriteTable, partitionSpec, extraOptions,
staticOverwritePartitionPathOpt)
val alignedQuery = alignQueryOutput(query, catalogTable, partitionSpec,
sparkSession.sessionState.conf)
@@ -118,6 +118,22 @@ object InsertIntoHoodieTableCommand extends Logging with
ProvidesHoodieConfig wi
success
}
+ private def getStaticOverwritePartitionPath(hoodieCatalogTable:
HoodieCatalogTable,
+ partitionsSpec: Map[String,
Option[String]],
+ isOverWritePartition: Boolean):
Option[String] = {
+ if (isOverWritePartition) {
+ val staticPartitionValues = filterStaticPartitionValues(partitionsSpec)
+ val isStaticOverwritePartition = staticPartitionValues.keys.size ==
hoodieCatalogTable.partitionFields.length
+ if (isStaticOverwritePartition) {
+ Option.apply(makePartitionPath(hoodieCatalogTable,
staticPartitionValues))
+ } else {
+ Option.empty
+ }
+ } else {
+ Option.empty
+ }
+ }
+
/**
* Align provided [[query]]'s output with the expected [[catalogTable]]
schema by
*
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
index a057efdd078..1a925827088 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
@@ -880,30 +880,19 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
}
test("Test insert overwrite partitions with empty dataset") {
- withSQLConf(SPARK_SQL_INSERT_INTO_OPERATION.key ->
WriteOperationType.BULK_INSERT.value()) {
- withRecordType()(withTempDir { tmp =>
- Seq("cow", "mor").foreach { tableType =>
- withTable(generateTableName) { inputTable =>
- spark.sql(
- s"""
- |create table $inputTable (
- | id int,
- | name string,
- | price double,
- | dt string
- |) using hudi
- | tblproperties (
- | type = '$tableType',
- | primaryKey = 'id'
- | )
- | partitioned by (dt)
- | location '${tmp.getCanonicalPath}/$inputTable'
- """.stripMargin)
-
- withTable(generateTableName) { target =>
+ Seq(true, false).foreach { enableBulkInsert =>
+ val bulkInsertConf: Array[(String, String)] = if (enableBulkInsert) {
+ Array(SPARK_SQL_INSERT_INTO_OPERATION.key ->
WriteOperationType.BULK_INSERT.value())
+ } else {
+ Array()
+ }
+ withSQLConf(bulkInsertConf: _*) {
+ withRecordType()(withTempDir { tmp =>
+ Seq("cow", "mor").foreach { tableType =>
+ withTable(generateTableName) { inputTable =>
spark.sql(
s"""
- |create table $target (
+ |create table $inputTable (
| id int,
| name string,
| price double,
@@ -914,21 +903,36 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
| primaryKey = 'id'
| )
| partitioned by (dt)
- | location '${tmp.getCanonicalPath}/$target'
+ | location '${tmp.getCanonicalPath}/$inputTable'
""".stripMargin)
- spark.sql(s"insert into $target values(3, 'c1', 13,
'2021-07-17')")
- spark.sql(s"insert into $target values(1, 'a1', 10,
'2021-07-18')")
-
- // Insert overwrite a partition with empty record
- spark.sql(s"insert overwrite table $target
partition(dt='2021-07-17') select id, name, price from $inputTable")
- // TODO enable result check after fix
https://issues.apache.org/jira/browse/HUDI-6828
- // checkAnswer(s"select id, name, price, dt from $target order
by id")(
- // Seq(1, "a1", 10.0, "2021-07-18")
- // )
+
+ withTable(generateTableName) { target =>
+ spark.sql(
+ s"""
+ |create table $target (
+ | id int,
+ | name string,
+ | price double,
+ | dt string
+ |) using hudi
+ | tblproperties (
+ | type = '$tableType',
+ | primaryKey = 'id'
+ | )
+ | partitioned by (dt)
+ | location '${tmp.getCanonicalPath}/$target'
+ """.stripMargin)
+ spark.sql(s"insert into $target values(3, 'c1', 13,
'2021-07-17')")
+ spark.sql(s"insert into $target values(1, 'a1', 10,
'2021-07-18')")
+
+ // Insert overwrite a partition with empty record
+ spark.sql(s"insert overwrite table $target
partition(dt='2021-07-17') select id, name, price from $inputTable")
+ checkAnswer(s"select id, name, price, dt from $target where
dt='2021-07-17'")(Seq.empty: _*)
+ }
}
}
- }
- })
+ })
+ }
}
}