linliu-code commented on code in PR #12588:
URL: https://github.com/apache/hudi/pull/12588#discussion_r1907726395
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala:
##########
@@ -3087,4 +3089,140 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
})
}
}
+
+ test("Test table with insert dup policy - drop case") {
+ withSQLConf("hoodie.datasource.insert.dup.policy" -> "drop") {
+ withTempDir { tmp =>
+ val targetTable = generateTableName
+ val tablePath = s"${tmp.getCanonicalPath}/$targetTable"
+
+ spark.sql(
+ s"""
+ |create table ${targetTable} (
+ | `id` string,
+ | `name` string,
+ | `dt` bigint,
+ | `day` STRING,
+ | `hour` INT
+ |) using hudi
+ |tblproperties (
+ | 'primaryKey' = 'id',
+ | 'type' = 'MOR',
+ | 'preCombineField'='dt',
+ | 'hoodie.index.type' = 'BUCKET',
+ | 'hoodie.bucket.index.hash.field' = 'id',
+ | 'hoodie.bucket.index.num.buckets'= 512
+ | )
+ partitioned by (`day`,`hour`)
+ location '${tablePath}'
+ """.stripMargin)
+
+ spark.sql("set spark.sql.shuffle.partitions = 11")
+ spark.sql(
+ s"""
+ |insert into ${targetTable}
+ |select '1' as id, 'aa' as name, 123 as dt, '2024-02-19' as
`day`, 10 as `hour`
+ |""".stripMargin)
+ spark.sql(
+ s"""
+ |insert into ${targetTable}
+ |select '1' as id, 'aa' as name, 1234 as dt, '2024-02-19' as
`day`, 10 as `hour`
+ |""".stripMargin)
+
+ // check result after insert and merge data into target table
+ checkAnswer(s"select id, name, dt, day, hour from $targetTable limit
10")(
+ Seq("1", "aa", 123, "2024-02-19", 10)
+ )
+ }
+ }
+ }
+
+ test("Test table with insert dup policy - fail case") {
+ withSQLConf("hoodie.datasource.insert.dup.policy" -> "fail") {
+ withTempDir { tmp =>
+ val targetTable = generateTableName
+ val tablePath = s"${tmp.getCanonicalPath}/$targetTable"
+
+ spark.sql(
+ s"""
+ |create table ${targetTable} (
+ | `id` string,
+ | `name` string,
+ | `dt` bigint,
+ | `day` STRING,
+ | `hour` INT
+ |) using hudi
+ |tblproperties (
+ | 'primaryKey' = 'id',
+ | 'type' = 'MOR',
+ | 'preCombineField'= 'dt',
+ | 'hoodie.index.type' = 'BUCKET',
+ | 'hoodie.bucket.index.hash.field' = 'id',
+ | 'hoodie.bucket.index.num.buckets'= 512
+ | )
+ partitioned by (`day`,`hour`)
+ location '${tablePath}'
+ """.stripMargin)
+
+ spark.sql("set spark.sql.shuffle.partitions = 11")
+ spark.sql(
+ s"""
+ |insert into ${targetTable}
+ |select '1' as id, 'aa' as name, 123 as dt, '2024-02-19' as
`day`, 10 as `hour`
+ |""".stripMargin)
+ checkException(
+ () => spark.sql(s"""
+ |insert into ${targetTable}
+ |select '1' as id, 'aa' as name, 1234 as dt, '2024-02-19' as
`day`, 10 as `hour`
+ |""".stripMargin))(s"Duplicate records detected. The number of
records after deduplication (0) is less than the incoming records (1).")
+ }
+ }
+ }
+
+ test("Test table with insert dup policy - none case") {
+ withSQLConf("hoodie.datasource.insert.dup.policy" -> "none") {
+ withTempDir { tmp =>
+ val targetTable = generateTableName
+ val tablePath = s"${tmp.getCanonicalPath}/$targetTable"
+
+ spark.sql(
+ s"""
+ |create table ${targetTable} (
+ | `id` string,
+ | `name` string,
+ | `dt` bigint,
+ | `day` STRING,
+ | `hour` INT
+ |) using hudi
+ |tblproperties (
+ | 'primaryKey' = 'id',
+ | 'type' = 'MOR',
+ | 'preCombineField'= 'dt',
+ | 'hoodie.index.type' = 'BUCKET',
+ | 'hoodie.bucket.index.hash.field' = 'id',
+ | 'hoodie.bucket.index.num.buckets'= 512
+ | )
+ partitioned by (`day`,`hour`)
+ location '${tablePath}'
+ """.stripMargin)
+
+ spark.sql("set spark.sql.shuffle.partitions = 11")
+ spark.sql(
+ s"""
+ |insert into ${targetTable}
+ |select '1' as id, 'aa' as name, 123 as dt, '2024-02-19' as
`day`, 10 as `hour`
+ |""".stripMargin)
+ spark.sql(
+ s"""
+ |insert into ${targetTable}
+ |select '1' as id, 'aa' as name, 1234 as dt, '2024-02-19' as
`day`, 10 as `hour`
+ |""".stripMargin)
+
+ // check result after insert and merge data into target table
+ checkAnswer(s"select id, name, dt, day, hour from $targetTable limit
10")(
Review Comment:
in that way, if one case fail, the other cases will stop. I remember
parameterization does not work here. Need to confirm.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]