This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 4bdb98b50c203e9f049734f80e9ee68110aff835
Author: wombatu-kun <[email protected]>
AuthorDate: Sun Mar 3 07:44:26 2024 +0700

    [HUDI-6089] Handle default insert behaviour to ingest duplicates (#10728)
    
    Co-authored-by: Vova Kolmakov <[email protected]>
---
 .../src/main/java/org/apache/hudi/config/HoodieWriteConfig.java       | 2 +-
 .../main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java  | 1 +
 .../src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java   | 1 +
 .../src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala    | 4 +++-
 .../test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala    | 2 ++
 .../apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java  | 2 ++
 6 files changed, 10 insertions(+), 2 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 4e1cdb9f5d3..99915fca25a 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -544,7 +544,7 @@ public class HoodieWriteConfig extends HoodieConfig {
 
   public static final ConfigProperty<String> 
MERGE_ALLOW_DUPLICATE_ON_INSERTS_ENABLE = ConfigProperty
       .key("hoodie.merge.allow.duplicate.on.inserts")
-      .defaultValue("false")
+      .defaultValue("true")
       .markAdvanced()
       .withDocumentation("When enabled, we allow duplicate keys even if 
inserts are routed to merge with an existing file (for ensuring file sizing)."
           + " This is only relevant for insert operation, since upsert, delete 
operations will ensure unique key constraints are maintained.");
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
index 7c42ccf5016..243b74b9199 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
@@ -86,6 +86,7 @@ public class HoodieMetadataWriteUtils {
     HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder()
         .withEngineType(writeConfig.getEngineType())
         .withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION)
+        .withMergeAllowDuplicateOnInserts(false)
         .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder()
             
.withConsistencyCheckEnabled(writeConfig.getConsistencyGuardConfig().isConsistencyCheckEnabled())
             
.withInitialConsistencyCheckIntervalMs(writeConfig.getConsistencyGuardConfig().getInitialConsistencyCheckIntervalMs())
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
index f9d5d69aec0..a5c31fd8347 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
@@ -89,6 +89,7 @@ public class TestHoodieWriteConfig {
     assertEquals(5, config.getMaxCommitsToKeep());
     assertEquals(2, config.getMinCommitsToKeep());
     assertTrue(config.shouldUseExternalSchemaTransformation());
+    assertTrue(config.allowDuplicateInserts());
   }
 
   @Test
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
index 82684912965..602881b6d2d 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
@@ -389,6 +389,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
         Seq(2, "a2", 12.0)
       )
 
+      spark.sql("set hoodie.merge.allow.duplicate.on.inserts = false")
       assertThrows[HoodieDuplicateKeyException] {
         try {
           spark.sql(s"insert into $tableName select 1, 'a1', 10")
@@ -1183,7 +1184,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
   }
 
   test("Test combine before insert") {
-    withSQLConf("hoodie.sql.bulk.insert.enable" -> "false") {
+    withSQLConf("hoodie.sql.bulk.insert.enable" -> "false", 
"hoodie.merge.allow.duplicate.on.inserts" -> "false") {
       withRecordType()(withTempDir{tmp =>
         val tableName = generateTableName
         spark.sql(
@@ -1497,6 +1498,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
         Seq(3, "a3", 30.0, 3000, "2021-01-07")
       )
 
+      spark.sql("set hoodie.merge.allow.duplicate.on.inserts = false")
       spark.sql(
         s"""
            | insert into $tableName values
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala
index d5dcfd01ad1..ef76cb72ca5 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala
@@ -917,6 +917,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
            | partitioned by(dt)
            | location '${tmp.getCanonicalPath}'
      """.stripMargin)
+      spark.sql("set hoodie.merge.allow.duplicate.on.inserts = false")
 
       spark.sql(
         s"""
@@ -965,6 +966,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
            | partitioned by(dt)
            | location '${path1}'
          """.stripMargin)
+      spark.sql("set hoodie.merge.allow.duplicate.on.inserts = false")
 
       spark.sql(s"insert into $sourceTable values(1, 'a1', cast(3.01 as 
double), 11, '2022-09-26'),(2, 'a2', cast(3.02 as double), 12, 
'2022-09-27'),(3, 'a3', cast(3.03 as double), 13, '2022-09-28'),(4, 'a4', 
cast(3.04 as double), 14, '2022-09-29')")
 
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index 7847feee8e8..502baf34ff4 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -1137,6 +1137,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
     cfg.configs.addAll(getTableServicesConfigs(totalRecords, "false", "", "", 
"true", "3"));
     cfg.configs.add(String.format("%s=%s", 
"hoodie.datasource.write.row.writer.enable", "false"));
+    cfg.configs.add(String.format("%s=%s", 
"hoodie.merge.allow.duplicate.on.inserts", "false"));
     HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
     deltaStreamerTestRunner(ds, cfg, (r) -> {
       TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, fs);
@@ -1200,6 +1201,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     cfg.continuousMode = true;
     cfg.tableType = HoodieTableType.MERGE_ON_READ.name();
     cfg.configs.addAll(getTableServicesConfigs(totalRecords, "false", "", "", 
"true", "3"));
+    cfg.configs.add(String.format("%s=%s", 
"hoodie.merge.allow.duplicate.on.inserts", "false"));
     HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
     deltaStreamerTestRunner(ds, cfg, (r) -> {
       TestHelpers.assertAtleastNCompactionCommits(2, tableBasePath, fs);

Reply via email to