This is an automated email from the ASF dual-hosted git repository.
leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new d65668611e3 [HUDI-5317] Fix insert overwrite table for partitioned
table (#7365)
d65668611e3 is described below
commit d65668611e3e0f514d2b20fc3cf0f360146f532e
Author: StreamingFlames <[email protected]>
AuthorDate: Thu Jan 12 16:29:15 2023 +0800
[HUDI-5317] Fix insert overwrite table for partitioned table (#7365)
---
.../spark/sql/hudi/ProvidesHoodieConfig.scala | 21 +++--
.../command/InsertIntoHoodieTableCommand.scala | 18 ++--
.../apache/spark/sql/hudi/TestInsertTable.scala | 104 +++++++++++++++++++++
.../spark/sql/hudi/catalog/HoodieCatalog.scala | 4 +-
.../sql/hudi/catalog/HoodieInternalV2Table.scala | 18 ++--
5 files changed, 135 insertions(+), 30 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
index fcba6e310dc..db2b93eda08 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
@@ -95,7 +95,8 @@ trait ProvidesHoodieConfig extends Logging {
*/
def buildHoodieInsertConfig(hoodieCatalogTable: HoodieCatalogTable,
sparkSession: SparkSession,
- isOverwrite: Boolean,
+ isOverwritePartition: Boolean,
+ isOverwriteTable: Boolean,
insertPartitions: Map[String, Option[String]] =
Map.empty,
extraOptions: Map[String, String]): Map[String,
String] = {
@@ -139,24 +140,24 @@ trait ProvidesHoodieConfig extends Logging {
val isPartitionedTable = hoodieCatalogTable.partitionFields.nonEmpty
val hasPrecombineColumn = hoodieCatalogTable.preCombineKey.nonEmpty
val operation =
- (enableBulkInsert, isOverwrite, dropDuplicate, isNonStrictMode,
isPartitionedTable) match {
- case (true, _, _, false, _) =>
+ (enableBulkInsert, isOverwritePartition, isOverwriteTable,
dropDuplicate, isNonStrictMode, isPartitionedTable) match {
+ case (true, _, _, _, false, _) =>
throw new IllegalArgumentException(s"Table with primaryKey can not
use bulk insert in ${insertMode.value()} mode.")
- case (true, true, _, _, true) =>
+ case (true, true, _, _, _, true) =>
throw new IllegalArgumentException(s"Insert Overwrite Partition can
not use bulk insert.")
- case (true, _, true, _, _) =>
+ case (true, _, _, true, _, _) =>
throw new IllegalArgumentException(s"Bulk insert cannot support drop
duplication." +
s" Please disable $INSERT_DROP_DUPS and try again.")
// if enableBulkInsert is true, use bulk insert for the insert
overwrite non-partitioned table.
- case (true, true, _, _, false) => BULK_INSERT_OPERATION_OPT_VAL
+ case (true, false, true, _, _, false) => BULK_INSERT_OPERATION_OPT_VAL
// insert overwrite table
- case (false, true, _, _, false) =>
INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL
+ case (false, false, true, _, _, _) =>
INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL
// insert overwrite partition
- case (_, true, _, _, true) => INSERT_OVERWRITE_OPERATION_OPT_VAL
+ case (_, true, false, _, _, true) => INSERT_OVERWRITE_OPERATION_OPT_VAL
// disable dropDuplicate, and provide preCombineKey, use the upsert
operation for strict and upsert mode.
- case (false, false, false, false, _) if hasPrecombineColumn =>
UPSERT_OPERATION_OPT_VAL
+ case (false, false, false, false, false, _) if hasPrecombineColumn =>
UPSERT_OPERATION_OPT_VAL
// if table is pk table and has enableBulkInsert use bulk insert for
non-strict mode.
- case (true, _, _, true, _) => BULK_INSERT_OPERATION_OPT_VAL
+ case (true, _, _, _, true, _) => BULK_INSERT_OPERATION_OPT_VAL
// for the rest case, use the insert operation
case _ => INSERT_OPERATION_OPT_VAL
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
index 0228e5ddcf7..2e4c1db099e 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
@@ -86,16 +86,22 @@ object InsertIntoHoodieTableCommand extends Logging with
ProvidesHoodieConfig wi
refreshTable: Boolean = true,
extraOptions: Map[String, String] = Map.empty): Boolean = {
val catalogTable = new HoodieCatalogTable(sparkSession, table)
- val config = buildHoodieInsertConfig(catalogTable, sparkSession,
overwrite, partitionSpec, extraOptions)
- // NOTE: In case of partitioned table we override specified "overwrite"
parameter
- // to instead append to the dataset
- val mode = if (overwrite && catalogTable.partitionFields.isEmpty) {
- SaveMode.Overwrite
+ var mode = SaveMode.Append
+ var isOverWriteTable = false
+ var isOverWritePartition = false
+ if (overwrite && catalogTable.partitionFields.isEmpty) {
+ // insert overwrite non-partition table
+ mode = SaveMode.Overwrite
+ isOverWriteTable = true
} else {
- SaveMode.Append
+ // for insert into or insert overwrite partition we use append mode.
+ mode = SaveMode.Append
+ isOverWritePartition = overwrite
}
+ val config = buildHoodieInsertConfig(catalogTable, sparkSession,
isOverWritePartition, isOverWriteTable, partitionSpec, extraOptions)
+
val alignedQuery = alignQueryOutput(query, catalogTable, partitionSpec,
sparkSession.sessionState.conf)
val (success, _, _, _, _, _) =
HoodieSparkSqlWriter.write(sparkSession.sqlContext, mode, config,
Dataset.ofRows(sparkSession, alignedQuery))
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
index 73aaabc0d8f..b6444b52b52 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.hudi
+import org.apache.hudi.DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.HoodieSparkUtils
import org.apache.hudi.common.table.{HoodieTableMetaClient,
TableSchemaResolver}
@@ -443,6 +444,109 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
})
}
+ test("Test Insert Overwrite Table for V2 Table") {
+ withSQLConf("hoodie.schema.on.read.enable" -> "true") {
+ withRecordType()(withTempDir { tmp =>
+ if (HoodieSparkUtils.gteqSpark3_2) {
+ val tableName = generateTableName
+ // Create a partitioned table
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long,
+ | dt string
+ |) using hudi
+ | tblproperties (primaryKey = 'id', preCombineField='dt')
+ | partitioned by (dt)
+ | location '${tmp.getCanonicalPath}/$tableName'
+ """.stripMargin)
+
+ // Test insert overwrite table
+ spark.sql(
+ s"""
+ | insert overwrite table $tableName
+ | values(1, 'a1', 10.0, 1000, '2021-01-05')
+ """.stripMargin)
+ checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+ Seq(1, "a1", 10.0, 1000, "2021-01-05")
+ )
+
+ // Insert overwrite table
+ spark.sql(
+ s"""
+ | insert overwrite table $tableName
+ | values (2, 'a2', 10.0, 1000, '2021-01-06')
+ """.stripMargin)
+ checkAnswer(s"select id, name, price, ts, dt from $tableName order
by id")(
+ Seq(2, "a2", 10.0, 1000, "2021-01-06")
+ )
+
+ // Insert overwrite static partition
+ spark.sql(
+ s"""
+ | insert overwrite table $tableName partition(dt = '2021-01-05')
+ | select * from (select 2 , 'a2', 12.0, 1000) limit 10
+ """.stripMargin)
+ checkAnswer(s"select id, name, price, ts, dt from $tableName order
by dt")(
+ Seq(2, "a2", 12.0, 1000, "2021-01-05"),
+ Seq(2, "a2", 10.0, 1000, "2021-01-06")
+ )
+
+ // Insert data from another table
+ val tblNonPartition = generateTableName
+ spark.sql(
+ s"""
+ | create table $tblNonPartition (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ | ) using hudi
+ | tblproperties (primaryKey = 'id')
+ | location '${tmp.getCanonicalPath}/$tblNonPartition'
+ """.stripMargin)
+ spark.sql(s"insert into $tblNonPartition select 1, 'a1', 10.0, 1000")
+ spark.sql(
+ s"""
+ | insert overwrite table $tableName partition(dt ='2021-01-04')
+ | select * from $tblNonPartition limit 10
+ """.stripMargin)
+ checkAnswer(s"select id, name, price, ts, dt from $tableName order
by id,dt")(
+ Seq(1, "a1", 10.0, 1000, "2021-01-04"),
+ Seq(2, "a2", 12.0, 1000, "2021-01-05"),
+ Seq(2, "a2", 10.0, 1000, "2021-01-06")
+ )
+
+ // Insert overwrite partitioned table, all partitions will be
truncated
+ spark.sql(
+ s"""
+ | insert overwrite table $tableName
+ | select id + 2, name, price, ts , '2021-01-04' from
$tblNonPartition limit 10
+ """.stripMargin)
+ checkAnswer(s"select id, name, price, ts, dt from $tableName " +
+ s"where dt >='2021-01-04' and dt <= '2021-01-06' order by id,dt")(
+ Seq(3, "a1", 10.0, 1000, "2021-01-04")
+ )
+
+ // Test insert overwrite non-partitioned table
+ spark.sql(s"insert overwrite table $tblNonPartition select 2, 'a2',
10.0, 1000")
+ checkAnswer(s"select id, name, price, ts from $tblNonPartition")(
+ Seq(2, "a2", 10.0, 1000)
+ )
+
+ spark.sql(s"insert overwrite table $tblNonPartition select 2, 'a2',
10.0, 2000")
+ checkAnswer(s"select id, name, price, ts from $tblNonPartition")(
+ Seq(2, "a2", 10.0, 2000)
+ )
+ }
+ })
+ }
+ }
+
+
test("Test Different Type of Partition Column") {
withRecordType()(withTempDir { tmp =>
val typeAndValue = Seq(
diff --git
a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieCatalog.scala
b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieCatalog.scala
index eeef56d3cff..6d3610db21e 100644
---
a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieCatalog.scala
+++
b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieCatalog.scala
@@ -293,13 +293,13 @@ class HoodieCatalog extends DelegatingCatalogExtension
DataSourceWriteOptions.SQL_INSERT_MODE.key ->
InsertMode.NON_STRICT.value(),
DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key -> "true"
)
- saveSourceDF(sourceQuery, tableDesc.properties ++
buildHoodieInsertConfig(hoodieCatalogTable, spark, isOverwrite = false,
Map.empty, options))
+ saveSourceDF(sourceQuery, tableDesc.properties ++
buildHoodieInsertConfig(hoodieCatalogTable, spark, isOverwritePartition =false,
isOverwriteTable = false, Map.empty, options))
CreateHoodieTableCommand.createTableInCatalog(spark, hoodieCatalogTable,
ignoreIfExists = false)
} else if (sourceQuery.isEmpty) {
saveSourceDF(sourceQuery, tableDesc.properties)
new CreateHoodieTableCommand(tableDesc, false).run(spark)
} else {
- saveSourceDF(sourceQuery, tableDesc.properties ++
buildHoodieInsertConfig(hoodieCatalogTable, spark, isOverwrite = false,
Map.empty, Map.empty))
+ saveSourceDF(sourceQuery, tableDesc.properties ++
buildHoodieInsertConfig(hoodieCatalogTable, spark, isOverwritePartition =
false, isOverwriteTable = false, Map.empty, Map.empty))
new CreateHoodieTableCommand(tableDesc, false).run(spark)
}
diff --git
a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieInternalV2Table.scala
b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieInternalV2Table.scala
index 9968095f3a5..b41c7456b71 100644
---
a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieInternalV2Table.scala
+++
b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieInternalV2Table.scala
@@ -89,15 +89,16 @@ private class HoodieV1WriteBuilder(writeOptions:
CaseInsensitiveStringMap,
spark: SparkSession)
extends SupportsTruncate with SupportsOverwrite with ProvidesHoodieConfig {
- private var forceOverwrite = false
+ private var overwriteTable = false
+ private var overwritePartition = false
override def truncate(): HoodieV1WriteBuilder = {
- forceOverwrite = true
+ overwriteTable = true
this
}
override def overwrite(filters: Array[Filter]): WriteBuilder = {
- forceOverwrite = true
+ overwritePartition = true
this
}
@@ -105,17 +106,10 @@ private class HoodieV1WriteBuilder(writeOptions:
CaseInsensitiveStringMap,
override def toInsertableRelation: InsertableRelation = {
new InsertableRelation {
override def insert(data: DataFrame, overwrite: Boolean): Unit = {
- val mode = if (forceOverwrite &&
hoodieCatalogTable.partitionFields.isEmpty) {
- // insert overwrite non-partition table
- SaveMode.Overwrite
- } else {
- // for insert into or insert overwrite partition we use append
mode.
- SaveMode.Append
- }
alignOutputColumns(data).write.format("org.apache.hudi")
- .mode(mode)
+ .mode(SaveMode.Append)
.options(buildHoodieConfig(hoodieCatalogTable) ++
- buildHoodieInsertConfig(hoodieCatalogTable, spark,
forceOverwrite, Map.empty, Map.empty))
+ buildHoodieInsertConfig(hoodieCatalogTable, spark,
overwritePartition, overwriteTable, Map.empty, Map.empty))
.save()
}
}