This is an automated email from the ASF dual-hosted git repository.
vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 27ac42818bc [HUDI-7183] Fix static insert overwrite partitions issue
(#10254)
27ac42818bc is described below
commit 27ac42818bcf3768c2a6742ac0edfcb79c253e52
Author: Wechar Yu <[email protected]>
AuthorDate: Sun Dec 17 11:32:30 2023 +0800
[HUDI-7183] Fix static insert overwrite partitions issue (#10254)
---
.../SparkInsertOverwriteCommitActionExecutor.java | 17 ++--
...setBulkInsertOverwriteCommitActionExecutor.java | 18 ++--
.../sql/catalyst/catalog/HoodieCatalogTable.scala | 7 +-
.../spark/sql/hudi/ProvidesHoodieConfig.scala | 83 ++++++++++--------
.../command/InsertIntoHoodieTableCommand.scala | 32 +------
.../apache/spark/sql/hudi/TestInsertTable.scala | 98 ++++++++++++++++++++++
6 files changed, 177 insertions(+), 78 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java
index d12efab229d..788e1040783 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java
@@ -36,7 +36,7 @@ import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.spark.Partitioner;
-import java.util.Collections;
+import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -81,14 +81,15 @@ public class SparkInsertOverwriteCommitActionExecutor<T>
@Override
protected Map<String, List<String>>
getPartitionToReplacedFileIds(HoodieWriteMetadata<HoodieData<WriteStatus>>
writeMetadata) {
- if (writeMetadata.getWriteStatuses().isEmpty()) {
- String staticOverwritePartition =
config.getStringOrDefault(HoodieInternalConfig.STATIC_OVERWRITE_PARTITION_PATHS);
- if (StringUtils.isNullOrEmpty(staticOverwritePartition)) {
- return Collections.emptyMap();
- } else {
- return Collections.singletonMap(staticOverwritePartition,
getAllExistingFileIds(staticOverwritePartition));
- }
+ String staticOverwritePartition =
config.getStringOrDefault(HoodieInternalConfig.STATIC_OVERWRITE_PARTITION_PATHS);
+ if (StringUtils.nonEmpty(staticOverwritePartition)) {
+ // static insert overwrite partitions
+ List<String> partitionPaths =
Arrays.asList(staticOverwritePartition.split(","));
+ context.setJobStatus(this.getClass().getSimpleName(), "Getting
ExistingFileIds of matching static partitions");
+ return
HoodieJavaPairRDD.getJavaPairRDD(context.parallelize(partitionPaths,
partitionPaths.size()).mapToPair(
+ partitionPath -> Pair.of(partitionPath,
getAllExistingFileIds(partitionPath)))).collectAsMap();
} else {
+ // dynamic insert overwrite partitions
return
HoodieJavaPairRDD.getJavaPairRDD(writeMetadata.getWriteStatuses().map(status ->
status.getStat().getPartitionPath()).distinct().mapToPair(partitionPath ->
Pair.of(partitionPath,
getAllExistingFileIds(partitionPath)))).collectAsMap();
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertOverwriteCommitActionExecutor.java
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertOverwriteCommitActionExecutor.java
index c1fd952b106..67ba2027cbd 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertOverwriteCommitActionExecutor.java
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertOverwriteCommitActionExecutor.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieInternalConfig;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -33,7 +34,7 @@ import org.apache.hudi.data.HoodieJavaPairRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
-import java.util.Collections;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -60,14 +61,15 @@ public class DatasetBulkInsertOverwriteCommitActionExecutor
extends BaseDatasetB
@Override
protected Map<String, List<String>>
getPartitionToReplacedFileIds(HoodieData<WriteStatus> writeStatuses) {
- if (writeStatuses.isEmpty()) {
- String staticOverwritePartition =
writeConfig.getStringOrDefault(HoodieInternalConfig.STATIC_OVERWRITE_PARTITION_PATHS);
- if (staticOverwritePartition == null ||
staticOverwritePartition.isEmpty()) {
- return Collections.emptyMap();
- } else {
- return Collections.singletonMap(staticOverwritePartition,
getAllExistingFileIds(staticOverwritePartition));
- }
+ String staticOverwritePartition =
writeConfig.getStringOrDefault(HoodieInternalConfig.STATIC_OVERWRITE_PARTITION_PATHS);
+ if (StringUtils.nonEmpty(staticOverwritePartition)) {
+ // static insert overwrite partitions
+ List<String> partitionPaths =
Arrays.asList(staticOverwritePartition.split(","));
+ table.getContext().setJobStatus(this.getClass().getSimpleName(),
"Getting ExistingFileIds of matching static partitions");
+ return
HoodieJavaPairRDD.getJavaPairRDD(table.getContext().parallelize(partitionPaths,
partitionPaths.size()).mapToPair(
+ partitionPath -> Pair.of(partitionPath,
getAllExistingFileIds(partitionPath)))).collectAsMap();
} else {
+ // dynamic insert overwrite partitions
return HoodieJavaPairRDD.getJavaPairRDD(writeStatuses.map(status ->
status.getStat().getPartitionPath()).distinct().mapToPair(partitionPath ->
Pair.of(partitionPath,
getAllExistingFileIds(partitionPath)))).collectAsMap();
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
index b766e0d315e..c1414fe77fe 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
@@ -339,7 +339,12 @@ class HoodieCatalogTable(val spark: SparkSession, var
table: CatalogTable) exten
nullableField
}
}.partition(f => partitionFields.contains(f.name))
- StructType(dataFields ++ partFields)
+ // insert_overwrite operation with partial partition values will mix up
the order
+ // of partition columns, so we also need reorder partition fields here.
+ val nameToField = partFields.map(field => (field.name, field)).toMap
+ val orderedPartFields = partitionFields.map(nameToField(_)).toSeq
+
+ StructType(dataFields ++ orderedPartFields)
})
catch {
case cause: Throwable =>
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
index 5492d12d5fb..250067d4b84 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.hudi
import
org.apache.hudi.AutoRecordKeyGenerationUtils.shouldAutoGenerateRecordKeys
-import org.apache.hudi.DataSourceWriteOptions
+import org.apache.hudi.{DataSourceWriteOptions, HoodieFileIndex}
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.HoodieConversionUtils.toProperties
import org.apache.hudi.common.config.{DFSPropertiesConfiguration,
TypedProperties}
@@ -32,8 +32,10 @@ import org.apache.hudi.keygen.ComplexKeyGenerator
import org.apache.hudi.sql.InsertMode
import org.apache.hudi.sync.common.HoodieSyncConfig
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo,
Literal}
+import org.apache.spark.sql.execution.datasources.FileStatusCache
import org.apache.spark.sql.hive.HiveExternalCatalog
import
org.apache.spark.sql.hudi.HoodieOptionConfig.mapSqlOptionsToDataSourceWriteConfigs
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{isHoodieConfigKey,
isUsingHiveCatalog}
@@ -334,42 +336,57 @@ trait ProvidesHoodieConfig extends Logging {
}
}
- def deduceIsOverwriteTable(sparkSession: SparkSession,
- catalogTable: HoodieCatalogTable,
- partitionSpec: Map[String, Option[String]],
- extraOptions: Map[String, String]): Boolean = {
+ /**
+ * Deduce the overwrite config based on writeOperation and overwriteMode
config.
+ * If hoodie.datasource.write.operation is
insert_overwrite/insert_overwrite_table, use dynamic overwrite;
+ * else if hoodie.datasource.overwrite.mode is configured, use it;
+ * else use spark.sql.sources.partitionOverwriteMode.
+ *
+ * The returned staticOverwritePartitionPathOpt is defined only in static
insert_overwrite case.
+ *
+ * @return (overwriteMode, isOverWriteTable, isOverWritePartition,
staticOverwritePartitionPathOpt)
+ */
+ def deduceOverwriteConfig(sparkSession: SparkSession,
+ catalogTable: HoodieCatalogTable,
+ partitionSpec: Map[String, Option[String]],
+ extraOptions: Map[String, String]): (SaveMode,
Boolean, Boolean, Option[String]) = {
val combinedOpts: Map[String, String] = combineOptions(catalogTable,
catalogTable.tableConfig, sparkSession.sqlContext.conf,
defaultOpts = Map.empty, overridingOpts = extraOptions)
val operation = combinedOpts.getOrElse(OPERATION.key, null)
- operation match {
- case INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL =>
- true
- case INSERT_OVERWRITE_OPERATION_OPT_VAL =>
- false
+ val isOverwriteOperation = operation != null &&
+ (operation.equals(INSERT_OVERWRITE_OPERATION_OPT_VAL) ||
operation.equals(INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL))
+ // If hoodie.datasource.overwrite.mode configured, respect it, otherwise
respect spark.sql.sources.partitionOverwriteMode
+ val hoodieOverwriteMode = combinedOpts.getOrElse(OVERWRITE_MODE.key,
+
sparkSession.sqlContext.getConf(PARTITION_OVERWRITE_MODE.key)).toUpperCase()
+ val isStaticOverwrite = !isOverwriteOperation && (hoodieOverwriteMode
match {
+ case "STATIC" => true
+ case "DYNAMIC" => false
+ case _ => throw new IllegalArgumentException("Config
hoodie.datasource.overwrite.mode is illegal")
+ })
+ val isOverWriteTable = operation match {
+ case INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL => true
+ case INSERT_OVERWRITE_OPERATION_OPT_VAL => false
case _ =>
- // NonPartitioned table always insert overwrite whole table
- if (catalogTable.partitionFields.isEmpty) {
- true
- } else {
- // Insert overwrite partitioned table with PARTITION clause will
always insert overwrite the specific partition
- if (partitionSpec.nonEmpty) {
- false
- } else {
- // If hoodie.datasource.overwrite.mode configured, respect it,
otherwise respect spark.sql.sources.partitionOverwriteMode
- val hoodieOverwriteMode =
combinedOpts.getOrElse(OVERWRITE_MODE.key,
-
sparkSession.sqlContext.getConf(PARTITION_OVERWRITE_MODE.key)).toUpperCase()
-
- hoodieOverwriteMode match {
- case "STATIC" =>
- true
- case "DYNAMIC" =>
- false
- case _ =>
- throw new IllegalArgumentException("Config
hoodie.datasource.overwrite.mode is illegal")
- }
- }
- }
+ // There are two cases where we need use insert_overwrite_table
+ // 1. NonPartitioned table always insert overwrite whole table
+ // 2. static mode and no partition values specified
+ catalogTable.partitionFields.isEmpty || (isStaticOverwrite &&
partitionSpec.isEmpty)
+ }
+ val overwriteMode = if (isOverWriteTable) SaveMode.Overwrite else
SaveMode.Append
+ val staticPartitions = if (isStaticOverwrite && !isOverWriteTable) {
+ val fileIndex = HoodieFileIndex(sparkSession, catalogTable.metaClient,
None, combinedOpts, FileStatusCache.getOrCreate(sparkSession))
+ val partitionNameToType = catalogTable.partitionSchema.fields.map(field
=> (field.name, field.dataType)).toMap
+ val staticPartitionValues = partitionSpec.filter(p =>
p._2.isDefined).mapValues(_.get)
+ val predicates = staticPartitionValues.map { case (k, v) =>
+ val partition = AttributeReference(k, partitionNameToType(k))()
+ val value = Literal(v)
+ EqualTo(partition, value)
+ }.toSeq
+
Option(fileIndex.getPartitionPaths(predicates).map(_.getPath).mkString(","))
+ } else {
+ Option.empty
}
+ (overwriteMode, isOverWriteTable, !isOverWriteTable, staticPartitions)
}
def buildHoodieDropPartitionsConfig(sparkSession: SparkSession,
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
index 008118bfcce..5a7aec53b63 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
@@ -88,19 +88,11 @@ object InsertIntoHoodieTableCommand extends Logging with
ProvidesHoodieConfig wi
extraOptions: Map[String, String] = Map.empty): Boolean = {
val catalogTable = new HoodieCatalogTable(sparkSession, table)
- var mode = SaveMode.Append
- var isOverWriteTable = false
- var isOverWritePartition = false
-
- if (overwrite) {
- if (deduceIsOverwriteTable(sparkSession, catalogTable, partitionSpec,
extraOptions)) {
- isOverWriteTable = true
- mode = SaveMode.Overwrite
- } else {
- isOverWritePartition = true
- }
+ val (mode, isOverWriteTable, isOverWritePartition,
staticOverwritePartitionPathOpt) = if (overwrite) {
+ deduceOverwriteConfig(sparkSession, catalogTable, partitionSpec,
extraOptions)
+ } else {
+ (SaveMode.Append, false, false, Option.empty)
}
- val staticOverwritePartitionPathOpt =
getStaticOverwritePartitionPath(catalogTable, partitionSpec,
isOverWritePartition)
val config = buildHoodieInsertConfig(catalogTable, sparkSession,
isOverWritePartition, isOverWriteTable, partitionSpec, extraOptions,
staticOverwritePartitionPathOpt)
val alignedQuery = alignQueryOutput(query, catalogTable, partitionSpec,
sparkSession.sessionState.conf)
@@ -118,22 +110,6 @@ object InsertIntoHoodieTableCommand extends Logging with
ProvidesHoodieConfig wi
success
}
- private def getStaticOverwritePartitionPath(hoodieCatalogTable:
HoodieCatalogTable,
- partitionsSpec: Map[String,
Option[String]],
- isOverWritePartition: Boolean):
Option[String] = {
- if (isOverWritePartition) {
- val staticPartitionValues = filterStaticPartitionValues(partitionsSpec)
- val isStaticOverwritePartition = staticPartitionValues.keys.size ==
hoodieCatalogTable.partitionFields.length
- if (isStaticOverwritePartition) {
- Option.apply(makePartitionPath(hoodieCatalogTable,
staticPartitionValues))
- } else {
- Option.empty
- }
- } else {
- Option.empty
- }
- }
-
/**
* Align provided [[query]]'s output with the expected [[catalogTable]]
schema by
*
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
index a62317d8920..b1b7353c2bc 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
@@ -504,6 +504,104 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
})
}
+ test("Test insert overwrite for multi partitioned table") {
+ withRecordType()(Seq("cow", "mor").foreach { tableType =>
+ Seq("dynamic", "static").foreach { overwriteMode =>
+ withTable(generateTableName) { tableName =>
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long,
+ | dt string,
+ | hh string
+ |) using hudi
+ | tblproperties (
+ | type = '$tableType',
+ | primaryKey = 'id'
+ | )
+ | partitioned by (dt, hh)
+ """.stripMargin
+ )
+
+ spark.sql(
+ s"""
+ | insert into table $tableName values
+ | (0, 'a0', 10, 1000, '2023-12-05', '00'),
+ | (1, 'a1', 10, 1000, '2023-12-06', '00'),
+ | (2, 'a2', 10, 1000, '2023-12-06', '01')
+ """.stripMargin)
+ checkAnswer(s"select id, name, price, ts, dt, hh from $tableName")(
+ Seq(0, "a0", 10.0, 1000, "2023-12-05", "00"),
+ Seq(1, "a1", 10.0, 1000, "2023-12-06", "00"),
+ Seq(2, "a2", 10.0, 1000, "2023-12-06", "01")
+ )
+
+ withSQLConf("hoodie.datasource.overwrite.mode" -> overwriteMode) {
+ // test insert overwrite partitions with partial partition values
+ spark.sql(
+ s"""
+ | insert overwrite table $tableName partition
(dt='2023-12-06', hh) values
+ | (3, 'a3', 10, 1000, '00'),
+ | (4, 'a4', 10, 1000, '02')
+ """.stripMargin)
+ val expected = if (overwriteMode.equalsIgnoreCase("dynamic")) {
+ Seq(
+ Seq(0, "a0", 10.0, 1000, "2023-12-05", "00"),
+ Seq(3, "a3", 10.0, 1000, "2023-12-06", "00"),
+ Seq(2, "a2", 10.0, 1000, "2023-12-06", "01"),
+ Seq(4, "a4", 10.0, 1000, "2023-12-06", "02")
+ )
+ } else {
+ Seq(
+ Seq(0, "a0", 10.0, 1000, "2023-12-05", "00"),
+ Seq(3, "a3", 10.0, 1000, "2023-12-06", "00"),
+ Seq(4, "a4", 10.0, 1000, "2023-12-06", "02")
+ )
+ }
+ checkAnswer(s"select id, name, price, ts, dt, hh from
$tableName")(expected: _*)
+
+ // test insert overwrite without partition values
+ spark.sql(
+ s"""
+ | insert overwrite table $tableName values
+ | (5, 'a5', 10, 1000, '2023-12-06', '02')
+ """.stripMargin)
+ val expected2 = if (overwriteMode.equalsIgnoreCase("dynamic")) {
+ // dynamic mode only overwrite the matching partitions
+ Seq(
+ Seq(0, "a0", 10.0, 1000, "2023-12-05", "00"),
+ Seq(3, "a3", 10.0, 1000, "2023-12-06", "00"),
+ Seq(2, "a2", 10.0, 1000, "2023-12-06", "01"),
+ Seq(5, "a5", 10.0, 1000, "2023-12-06", "02")
+ )
+ } else {
+ // static mode will overwrite the table
+ Seq(
+ Seq(5, "a5", 10.0, 1000, "2023-12-06", "02")
+ )
+ }
+ checkAnswer(s"select id, name, price, ts, dt, hh from
$tableName")(expected2: _*)
+
+ // test insert overwrite table
+ withSQLConf("hoodie.datasource.write.operation" ->
"insert_overwrite_table") {
+ spark.sql(
+ s"""
+ | insert overwrite table $tableName partition
(dt='2023-12-06', hh) values
+ | (6, 'a6', 10, 1000, '00')
+ """.stripMargin)
+ checkAnswer(s"select id, name, price, ts, dt, hh from
$tableName")(
+ Seq(6, "a6", 10.0, 1000, "2023-12-06", "00")
+ )
+ }
+ }
+ }
+ }
+ })
+ }
+
test("Test Different Type of Partition Column") {
withRecordType()(withTempDir { tmp =>
val typeAndValue = Seq(