This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch branch-0.x in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 4bdb98b50c203e9f049734f80e9ee68110aff835 Author: wombatu-kun <[email protected]> AuthorDate: Sun Mar 3 07:44:26 2024 +0700 [HUDI-6089] Handle default insert behaviour to ingest duplicates (#10728) Co-authored-by: Vova Kolmakov <[email protected]> --- .../src/main/java/org/apache/hudi/config/HoodieWriteConfig.java | 2 +- .../main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java | 1 + .../src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java | 1 + .../src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala | 4 +++- .../test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala | 2 ++ .../apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java | 2 ++ 6 files changed, 10 insertions(+), 2 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 4e1cdb9f5d3..99915fca25a 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -544,7 +544,7 @@ public class HoodieWriteConfig extends HoodieConfig { public static final ConfigProperty<String> MERGE_ALLOW_DUPLICATE_ON_INSERTS_ENABLE = ConfigProperty .key("hoodie.merge.allow.duplicate.on.inserts") - .defaultValue("false") + .defaultValue("true") .markAdvanced() .withDocumentation("When enabled, we allow duplicate keys even if inserts are routed to merge with an existing file (for ensuring file sizing)." + " This is only relevant for insert operation, since upsert, delete operations will ensure unique key constraints are maintained."); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java index 7c42ccf5016..243b74b9199 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java @@ -86,6 +86,7 @@ public class HoodieMetadataWriteUtils { HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder() .withEngineType(writeConfig.getEngineType()) .withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION) + .withMergeAllowDuplicateOnInserts(false) .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder() .withConsistencyCheckEnabled(writeConfig.getConsistencyGuardConfig().isConsistencyCheckEnabled()) .withInitialConsistencyCheckIntervalMs(writeConfig.getConsistencyGuardConfig().getInitialConsistencyCheckIntervalMs()) diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java index f9d5d69aec0..a5c31fd8347 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java @@ -89,6 +89,7 @@ public class TestHoodieWriteConfig { assertEquals(5, config.getMaxCommitsToKeep()); assertEquals(2, config.getMinCommitsToKeep()); assertTrue(config.shouldUseExternalSchemaTransformation()); + assertTrue(config.allowDuplicateInserts()); } @Test diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala index 82684912965..602881b6d2d 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala @@ -389,6 +389,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase { Seq(2, "a2", 12.0) ) + spark.sql("set hoodie.merge.allow.duplicate.on.inserts = false") assertThrows[HoodieDuplicateKeyException] { try { spark.sql(s"insert into $tableName select 1, 'a1', 10") @@ -1183,7 +1184,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase { } test("Test combine before insert") { - withSQLConf("hoodie.sql.bulk.insert.enable" -> "false") { + withSQLConf("hoodie.sql.bulk.insert.enable" -> "false", "hoodie.merge.allow.duplicate.on.inserts" -> "false") { withRecordType()(withTempDir{tmp => val tableName = generateTableName spark.sql( @@ -1497,6 +1498,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase { Seq(3, "a3", 30.0, 3000, "2021-01-07") ) + spark.sql("set hoodie.merge.allow.duplicate.on.inserts = false") spark.sql( s""" | insert into $tableName values diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala index d5dcfd01ad1..ef76cb72ca5 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala @@ -917,6 +917,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase { | partitioned by(dt) | location '${tmp.getCanonicalPath}' """.stripMargin) + spark.sql("set hoodie.merge.allow.duplicate.on.inserts = false") spark.sql( s""" @@ -965,6 +966,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase { | partitioned by(dt) | location '${path1}' """.stripMargin) + spark.sql("set hoodie.merge.allow.duplicate.on.inserts = false") spark.sql(s"insert into $sourceTable values(1, 'a1', cast(3.01 as double), 11, '2022-09-26'),(2, 'a2', cast(3.02 as double), 12, '2022-09-27'),(3, 'a3', cast(3.03 as double), 13, '2022-09-28'),(4, 'a4', cast(3.04 as double), 14, '2022-09-29')") diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java index 7847feee8e8..502baf34ff4 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java @@ -1137,6 +1137,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { cfg.tableType = HoodieTableType.COPY_ON_WRITE.name(); cfg.configs.addAll(getTableServicesConfigs(totalRecords, "false", "", "", "true", "3")); cfg.configs.add(String.format("%s=%s", "hoodie.datasource.write.row.writer.enable", "false")); + cfg.configs.add(String.format("%s=%s", "hoodie.merge.allow.duplicate.on.inserts", "false")); HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc); deltaStreamerTestRunner(ds, cfg, (r) -> { TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, fs); @@ -1200,6 +1201,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { cfg.continuousMode = true; cfg.tableType = HoodieTableType.MERGE_ON_READ.name(); cfg.configs.addAll(getTableServicesConfigs(totalRecords, "false", "", "", "true", "3")); + cfg.configs.add(String.format("%s=%s", "hoodie.merge.allow.duplicate.on.inserts", "false")); HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc); deltaStreamerTestRunner(ds, cfg, (r) -> { TestHelpers.assertAtleastNCompactionCommits(2, tableBasePath, fs);
