nsivabalan commented on code in PR #12588:
URL: https://github.com/apache/hudi/pull/12588#discussion_r1906248293
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -1165,3 +1163,54 @@ class HoodieSparkSqlWriterInternal {
.map(_.toLong).filter(SQLExecution.getQueryExecution(_) eq
newQueryExecution)
}
}
+
+object HoodieSparkSqlWriterInternal {
+ // Check if duplicates should be dropped.
+ def shouldDropDuplicatesForInserts(hoodieConfig: HoodieConfig): Boolean = {
+ hoodieConfig.contains(INSERT_DUP_POLICY) &&
+
hoodieConfig.getString(INSERT_DUP_POLICY).equalsIgnoreCase(DROP_INSERT_DUP_POLICY)
+ }
+
+ // Check if we should fail if duplicates are found.
+ def shouldFailWhenDuplicatesFound(hoodieConfig: HoodieConfig): Boolean = {
+ hoodieConfig.contains(INSERT_DUP_POLICY) &&
+
hoodieConfig.getString(INSERT_DUP_POLICY).equalsIgnoreCase(FAIL_INSERT_DUP_POLICY)
+ }
+
+ // Check if deduplication is required.
+ def isDeduplicationRequired(hoodieConfig: HoodieConfig): Boolean = {
+ hoodieConfig.getBoolean(INSERT_DROP_DUPS) ||
+ shouldFailWhenDuplicatesFound(hoodieConfig) ||
+ shouldDropDuplicatesForInserts(hoodieConfig)
+ }
+
+ // Check if deduplication is needed.
+ def isDeduplicationNeeded(operation: WriteOperationType): Boolean = {
+ operation == WriteOperationType.INSERT ||
+ operation == WriteOperationType.INSERT_PREPPED
+ }
+
+ def handleInsertDuplicates(incomingRecords: JavaRDD[HoodieRecord[_]],
+ hoodieConfig: HoodieConfig,
+ operation: WriteOperationType,
+ jsc: JavaSparkContext,
+ parameters: Map[String, String]):
JavaRDD[HoodieRecord[_]] = {
+ // If no deduplication is needed, return the incoming records as is
+ if (!isDeduplicationRequired(hoodieConfig) ||
!isDeduplicationNeeded(operation)) {
+ incomingRecords
+ } else {
+ // Perform deduplication
+ val deduplicatedRecords = DataSourceUtils.dropDuplicates(jsc,
incomingRecords, parameters.asJava)
Review Comment:
would be better if we push it down to
```
public static JavaRDD<HoodieRecord> dropDuplicates(HoodieSparkEngineContext
engineContext, JavaRDD<HoodieRecord> incomingHoodieRecords,
HoodieWriteConfig writeConfig) {
try {
SparkRDDReadClient client = new SparkRDDReadClient<>(engineContext,
writeConfig);
return client.tagLocation(incomingHoodieRecords)
.filter(r -> !((HoodieRecord<HoodieRecordPayload>)
r).isCurrentLocationKnown());
} catch (TableNotFoundException e) {
// this will be executed when there is no hoodie table yet
// so no dups to drop
return incomingHoodieRecords;
}
}
```
this method in DataSourceUtils only.
why trigger the dag twice.
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala:
##########
@@ -3087,4 +3089,140 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
})
}
}
+
+ test("Test table with insert dup policy - drop case") {
+ withSQLConf("hoodie.datasource.insert.dup.policy" -> "drop") {
Review Comment:
can we add tests for all 3 drop dup policy at spark ds layer?
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -1165,3 +1163,54 @@ class HoodieSparkSqlWriterInternal {
.map(_.toLong).filter(SQLExecution.getQueryExecution(_) eq
newQueryExecution)
}
}
+
+object HoodieSparkSqlWriterInternal {
+ // Check if duplicates should be dropped.
+ def shouldDropDuplicatesForInserts(hoodieConfig: HoodieConfig): Boolean = {
+ hoodieConfig.contains(INSERT_DUP_POLICY) &&
+
hoodieConfig.getString(INSERT_DUP_POLICY).equalsIgnoreCase(DROP_INSERT_DUP_POLICY)
+ }
+
+ // Check if we should fail if duplicates are found.
+ def shouldFailWhenDuplicatesFound(hoodieConfig: HoodieConfig): Boolean = {
+ hoodieConfig.contains(INSERT_DUP_POLICY) &&
+
hoodieConfig.getString(INSERT_DUP_POLICY).equalsIgnoreCase(FAIL_INSERT_DUP_POLICY)
+ }
+
+ // Check if deduplication is required.
+ def isDeduplicationRequired(hoodieConfig: HoodieConfig): Boolean = {
+ hoodieConfig.getBoolean(INSERT_DROP_DUPS) ||
+ shouldFailWhenDuplicatesFound(hoodieConfig) ||
+ shouldDropDuplicatesForInserts(hoodieConfig)
+ }
+
+ // Check if deduplication is needed.
+ def isDeduplicationNeeded(operation: WriteOperationType): Boolean = {
Review Comment:
from where did you pull this from?
is it somewhere in master. or are we introducing this newly in this patch ?
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala:
##########
@@ -3087,4 +3089,140 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
})
}
}
+
+ test("Test table with insert dup policy - drop case") {
+ withSQLConf("hoodie.datasource.insert.dup.policy" -> "drop") {
+ withTempDir { tmp =>
+ val targetTable = generateTableName
+ val tablePath = s"${tmp.getCanonicalPath}/$targetTable"
+
+ spark.sql(
+ s"""
+ |create table ${targetTable} (
+ | `id` string,
+ | `name` string,
+ | `dt` bigint,
+ | `day` STRING,
+ | `hour` INT
+ |) using hudi
+ |tblproperties (
+ | 'primaryKey' = 'id',
+ | 'type' = 'MOR',
+ | 'preCombineField'='dt',
+ | 'hoodie.index.type' = 'BUCKET',
+ | 'hoodie.bucket.index.hash.field' = 'id',
+ | 'hoodie.bucket.index.num.buckets'= 512
+ | )
+ partitioned by (`day`,`hour`)
+ location '${tablePath}'
+ """.stripMargin)
+
+ spark.sql("set spark.sql.shuffle.partitions = 11")
+ spark.sql(
+ s"""
+ |insert into ${targetTable}
+ |select '1' as id, 'aa' as name, 123 as dt, '2024-02-19' as
`day`, 10 as `hour`
+ |""".stripMargin)
+ spark.sql(
+ s"""
+ |insert into ${targetTable}
+ |select '1' as id, 'aa' as name, 1234 as dt, '2024-02-19' as
`day`, 10 as `hour`
+ |""".stripMargin)
+
+ // check result after insert and merge data into target table
+ checkAnswer(s"select id, name, dt, day, hour from $targetTable limit
10")(
+ Seq("1", "aa", 123, "2024-02-19", 10)
+ )
+ }
+ }
+ }
+
+ test("Test table with insert dup policy - fail case") {
+ withSQLConf("hoodie.datasource.insert.dup.policy" -> "fail") {
+ withTempDir { tmp =>
+ val targetTable = generateTableName
+ val tablePath = s"${tmp.getCanonicalPath}/$targetTable"
+
+ spark.sql(
+ s"""
+ |create table ${targetTable} (
+ | `id` string,
+ | `name` string,
+ | `dt` bigint,
+ | `day` STRING,
+ | `hour` INT
+ |) using hudi
+ |tblproperties (
+ | 'primaryKey' = 'id',
+ | 'type' = 'MOR',
+ | 'preCombineField'= 'dt',
+ | 'hoodie.index.type' = 'BUCKET',
+ | 'hoodie.bucket.index.hash.field' = 'id',
+ | 'hoodie.bucket.index.num.buckets'= 512
+ | )
+ partitioned by (`day`,`hour`)
+ location '${tablePath}'
+ """.stripMargin)
+
+ spark.sql("set spark.sql.shuffle.partitions = 11")
+ spark.sql(
+ s"""
+ |insert into ${targetTable}
+ |select '1' as id, 'aa' as name, 123 as dt, '2024-02-19' as
`day`, 10 as `hour`
+ |""".stripMargin)
+ checkException(
+ () => spark.sql(s"""
+ |insert into ${targetTable}
+ |select '1' as id, 'aa' as name, 1234 as dt, '2024-02-19' as
`day`, 10 as `hour`
+ |""".stripMargin))(s"Duplicate records detected. The number of
records after deduplication (0) is less than the incoming records (1).")
+ }
+ }
+ }
+
+ test("Test table with insert dup policy - none case") {
+ withSQLConf("hoodie.datasource.insert.dup.policy" -> "none") {
+ withTempDir { tmp =>
+ val targetTable = generateTableName
+ val tablePath = s"${tmp.getCanonicalPath}/$targetTable"
+
+ spark.sql(
+ s"""
+ |create table ${targetTable} (
+ | `id` string,
+ | `name` string,
+ | `dt` bigint,
+ | `day` STRING,
+ | `hour` INT
+ |) using hudi
+ |tblproperties (
+ | 'primaryKey' = 'id',
+ | 'type' = 'MOR',
+ | 'preCombineField'= 'dt',
+ | 'hoodie.index.type' = 'BUCKET',
+ | 'hoodie.bucket.index.hash.field' = 'id',
+ | 'hoodie.bucket.index.num.buckets'= 512
+ | )
+ partitioned by (`day`,`hour`)
+ location '${tablePath}'
+ """.stripMargin)
+
+ spark.sql("set spark.sql.shuffle.partitions = 11")
+ spark.sql(
+ s"""
+ |insert into ${targetTable}
+ |select '1' as id, 'aa' as name, 123 as dt, '2024-02-19' as
`day`, 10 as `hour`
+ |""".stripMargin)
+ spark.sql(
+ s"""
+ |insert into ${targetTable}
+ |select '1' as id, 'aa' as name, 1234 as dt, '2024-02-19' as
`day`, 10 as `hour`
+ |""".stripMargin)
+
+ // check result after insert and merge data into target table
+ checkAnswer(s"select id, name, dt, day, hour from $targetTable limit
10")(
Review Comment:
can we have `if`, `else if`, and `else` branch here and have just 1 test
method. Why duplicate the test code.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]