This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch release-0.13.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 9d3a8d5924d5ca95489e52adf41c418bb2eb6663 Author: Zouxxyy <[email protected]> AuthorDate: Wed Feb 1 13:54:37 2023 +0800 [HUDI-5317] Fix insert overwrite table for partitioned table (#7793) --- .../command/InsertIntoHoodieTableCommand.scala | 4 +- .../apache/spark/sql/hudi/TestInsertTable.scala | 120 ++------------------- 2 files changed, 12 insertions(+), 112 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala index 2e4c1db099e..f07611ad019 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala @@ -90,8 +90,8 @@ object InsertIntoHoodieTableCommand extends Logging with ProvidesHoodieConfig wi var mode = SaveMode.Append var isOverWriteTable = false var isOverWritePartition = false - if (overwrite && catalogTable.partitionFields.isEmpty) { - // insert overwrite non-partition table + if (overwrite && partitionSpec.isEmpty) { + // insert overwrite table mode = SaveMode.Overwrite isOverWriteTable = true } else { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala index a227a20b8b6..b092a68e20d 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala @@ -369,7 +369,8 @@ class TestInsertTable extends HoodieSparkSqlTestBase { | partitioned by (dt) | location '${tmp.getCanonicalPath}/$tableName' """.stripMargin) - // Insert overwrite dynamic partition + + // Insert overwrite table spark.sql( s""" | insert overwrite table $tableName @@ -379,14 +380,13 @@ class TestInsertTable extends HoodieSparkSqlTestBase { Seq(1, "a1", 10.0, 1000, "2021-01-05") ) - // Insert overwrite dynamic partition + // Insert overwrite table spark.sql( s""" | insert overwrite table $tableName | select 2 as id, 'a2' as name, 10 as price, 1000 as ts, '2021-01-06' as dt """.stripMargin) checkAnswer(s"select id, name, price, ts, dt from $tableName order by id")( - Seq(1, "a1", 10.0, 1000, "2021-01-05"), Seq(2, "a2", 10.0, 1000, "2021-01-06") ) @@ -433,122 +433,22 @@ class TestInsertTable extends HoodieSparkSqlTestBase { """.stripMargin) checkAnswer(s"select id, name, price, ts, dt from $tableName " + s"where dt >='2021-01-04' and dt <= '2021-01-06' order by id,dt")( - Seq(2, "a2", 12.0, 1000, "2021-01-05"), - Seq(2, "a2", 10.0, 1000, "2021-01-06"), Seq(3, "a1", 10.0, 1000, "2021-01-04") ) - // test insert overwrite non-partitioned table + // Test insert overwrite non-partitioned table spark.sql(s"insert overwrite table $tblNonPartition select 2, 'a2', 10, 1000") checkAnswer(s"select id, name, price, ts from $tblNonPartition")( Seq(2, "a2", 10.0, 1000) ) - }) - } - test("Test Insert Overwrite Table for V2 Table") { - withSQLConf("hoodie.schema.on.read.enable" -> "true") { - withRecordType()(withTempDir { tmp => - if (HoodieSparkUtils.gteqSpark3_2) { - val tableName = generateTableName - // Create a partitioned table - spark.sql( - s""" - |create table $tableName ( - | id int, - | name string, - | price double, - | ts long, - | dt string - |) using hudi - | tblproperties (primaryKey = 'id', preCombineField='dt') - | partitioned by (dt) - | location '${tmp.getCanonicalPath}/$tableName' - """.stripMargin) - - // Test insert overwrite table - spark.sql( - s""" - | insert overwrite table $tableName - | values(1, 'a1', 10.0, 1000, '2021-01-05') - """.stripMargin) - checkAnswer(s"select id, name, price, ts, dt from $tableName")( - Seq(1, "a1", 10.0, 1000, "2021-01-05") - ) - - // Insert overwrite table - spark.sql( - s""" - | insert overwrite table $tableName - | values (2, 'a2', 10.0, 1000, '2021-01-06') - """.stripMargin) - checkAnswer(s"select id, name, price, ts, dt from $tableName order by id")( - Seq(2, "a2", 10.0, 1000, "2021-01-06") - ) - - // Insert overwrite static partition - spark.sql( - s""" - | insert overwrite table $tableName partition(dt = '2021-01-05') - | select * from (select 2 , 'a2', 12.0, 1000) limit 10 - """.stripMargin) - checkAnswer(s"select id, name, price, ts, dt from $tableName order by dt")( - Seq(2, "a2", 12.0, 1000, "2021-01-05"), - Seq(2, "a2", 10.0, 1000, "2021-01-06") - ) - - // Insert data from another table - val tblNonPartition = generateTableName - spark.sql( - s""" - | create table $tblNonPartition ( - | id int, - | name string, - | price double, - | ts long - | ) using hudi - | tblproperties (primaryKey = 'id') - | location '${tmp.getCanonicalPath}/$tblNonPartition' - """.stripMargin) - spark.sql(s"insert into $tblNonPartition select 1, 'a1', 10.0, 1000") - spark.sql( - s""" - | insert overwrite table $tableName partition(dt ='2021-01-04') - | select * from $tblNonPartition limit 10 - """.stripMargin) - checkAnswer(s"select id, name, price, ts, dt from $tableName order by id,dt")( - Seq(1, "a1", 10.0, 1000, "2021-01-04"), - Seq(2, "a2", 12.0, 1000, "2021-01-05"), - Seq(2, "a2", 10.0, 1000, "2021-01-06") - ) - - // Insert overwrite partitioned table, all partitions will be truncated - spark.sql( - s""" - | insert overwrite table $tableName - | select id + 2, name, price, ts , '2021-01-04' from $tblNonPartition limit 10 - """.stripMargin) - checkAnswer(s"select id, name, price, ts, dt from $tableName " + - s"where dt >='2021-01-04' and dt <= '2021-01-06' order by id,dt")( - Seq(3, "a1", 10.0, 1000, "2021-01-04") - ) - - // Test insert overwrite non-partitioned table - spark.sql(s"insert overwrite table $tblNonPartition select 2, 'a2', 10.0, 1000") - checkAnswer(s"select id, name, price, ts from $tblNonPartition")( - Seq(2, "a2", 10.0, 1000) - ) - - spark.sql(s"insert overwrite table $tblNonPartition select 2, 'a2', 10.0, 2000") - checkAnswer(s"select id, name, price, ts from $tblNonPartition")( - Seq(2, "a2", 10.0, 2000) - ) - } - }) - } + spark.sql(s"insert overwrite table $tblNonPartition select 3, 'a3', 10, 1000") + checkAnswer(s"select id, name, price, ts from $tblNonPartition")( + Seq(3, "a3", 10.0, 1000) + ) + }) } - test("Test Different Type of Partition Column") { withRecordType()(withTempDir { tmp => val typeAndValue = Seq( @@ -666,7 +566,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase { | tblproperties (primaryKey = 'id') | partitioned by (dt) """.stripMargin) - checkException(s"insert overwrite table $tableName3 values(1, 'a1', 10, '2021-07-18')")( + checkException(s"insert overwrite table $tableName3 partition(dt = '2021-07-18') values(1, 'a1', 10, '2021-07-18')")( "Insert Overwrite Partition can not use bulk insert." ) }
