This is an automated email from the ASF dual-hosted git repository.
mengtao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 7a6272fba1 [HUDI-3781] fix spark delete sql can not delete record
(#5215)
7a6272fba1 is described below
commit 7a6272fba150fa9d7acd0b57cc4041ec49019faf
Author: KnightChess <[email protected]>
AuthorDate: Fri Apr 8 14:26:40 2022 +0800
[HUDI-3781] fix spark delete sql can not delete record (#5215)
---
.../spark/sql/hudi/ProvidesHoodieConfig.scala | 5 ++-
.../apache/spark/sql/hudi/TestDeleteTable.scala | 47 ++++++++++++++++++++++
2 files changed, 51 insertions(+), 1 deletion(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
index 74255473b5..31fb0ad6cb 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
@@ -255,7 +255,10 @@ trait ProvidesHoodieConfig extends Logging {
val hoodieProps = getHoodieProps(catalogProperties, tableConfig,
sparkSession.sqlContext.conf)
val hiveSyncConfig = buildHiveSyncConfig(hoodieProps, hoodieCatalogTable)
- withSparkConf(sparkSession, hoodieCatalogTable.catalogProperties) {
+ // operation can not be overwrite
+ val options = hoodieCatalogTable.catalogProperties.-(OPERATION.key())
+
+ withSparkConf(sparkSession, options) {
Map(
"path" -> path,
RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","),
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDeleteTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDeleteTable.scala
index f005a14d7f..9c693f9626 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDeleteTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDeleteTable.scala
@@ -151,4 +151,51 @@ class TestDeleteTable extends TestHoodieSqlBase {
}
}
}
+
+ test("Test Delete Table with op upsert") {
+ withTempDir { tmp =>
+ Seq("cow", "mor").foreach {tableType =>
+ val tableName = generateTableName
+ // create table
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | location '${tmp.getCanonicalPath}/$tableName'
+ | tblproperties (
+ | type = '$tableType',
+ | primaryKey = 'id',
+ | preCombineField = 'ts',
+ | hoodie.datasource.write.operation = 'upsert'
+ | )
+ """.stripMargin)
+ // insert data to table
+ spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
+ checkAnswer(s"select id, name, price, ts from $tableName")(
+ Seq(1, "a1", 10.0, 1000)
+ )
+
+ // delete data from table
+ spark.sql(s"delete from $tableName where id = 1")
+ checkAnswer(s"select count(1) from $tableName") (
+ Seq(0)
+ )
+
+ spark.sql(s"insert into $tableName select 2, 'a2', 10, 1000")
+ spark.sql(s"delete from $tableName where id = 1")
+ checkAnswer(s"select id, name, price, ts from $tableName")(
+ Seq(2, "a2", 10.0, 1000)
+ )
+
+ spark.sql(s"delete from $tableName")
+ checkAnswer(s"select count(1) from $tableName")(
+ Seq(0)
+ )
+ }
+ }
+ }
}