This is an automated email from the ASF dual-hosted git repository. zhangyue19921010 pushed a commit to branch partition-bucket-index in repository https://gitbox.apache.org/repos/asf/hudi.git
commit cc8f397f6b52c8d2cf9c77ba90aa75e34d618096 Author: Sivabalan Narayanan <[email protected]> AuthorDate: Tue Mar 18 17:26:03 2025 -0700 [HUDI-9083] Fixing flakiness with multi writer test (#12987) --- .../org/apache/hudi/client/TestHoodieClientMultiWriter.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java index a55a52f5bf9..f2b76780f46 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java @@ -44,6 +44,7 @@ import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.marker.MarkerType; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.timeline.InstantComparison; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.table.view.FileSystemViewStorageType; import org.apache.hudi.common.testutils.HoodieTestTable; @@ -51,6 +52,7 @@ import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.util.CommitUtils; import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.config.HoodieArchivalConfig; import org.apache.hudi.config.HoodieCleanConfig; import org.apache.hudi.config.HoodieClusteringConfig; @@ -812,6 +814,9 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase { final SparkRDDWriteClient client1 = getHoodieWriteClient(cfg2); final SparkRDDWriteClient client2 = getHoodieWriteClient(cfg); final SparkRDDWriteClient client3 = getHoodieWriteClient(cfg); + final String upsertCommitTime = client1.createNewInstantTime(); // upsert commit time has to be lesser than compaction instant time. + // and w/ MOR table, during conflict resolution, we will definitely hit conflict resolution exception. + // if the delta commit's instant time is not guaranteed to be < compaction instant time, then delta commit will succeed w/o issues. // Test with concurrent operations could be flaky, to reduce possibility of wrong ordering some queue is added // For InProcessLockProvider we could wait less @@ -820,7 +825,6 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase { // Create upserts, schedule cleaning, schedule compaction in parallel Future future1 = executors.submit(() -> { - final String newCommitTime = client1.createNewInstantTime(); final int numRecords = 100; final String commitTimeBetweenPrevAndNew = secondCommitTime; @@ -834,11 +838,12 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase { // Since the concurrent modifications went in, this upsert has // to fail assertThrows(HoodieWriteConflictException.class, () -> { - createCommitWithUpserts(cfg, client1, thirdCommitTime, Option.of(commitTimeBetweenPrevAndNew), newCommitTime, numRecords); + createCommitWithUpserts(cfg, client1, thirdCommitTime, Option.of(commitTimeBetweenPrevAndNew), upsertCommitTime, numRecords); }); } else { // We don't have the compaction for COW and so this upsert // has to pass + final String newCommitTime = client1.createNewInstantTime(); assertDoesNotThrow(() -> { createCommitWithUpserts(cfg, client1, thirdCommitTime, Option.of(commitTimeBetweenPrevAndNew), newCommitTime, numRecords); }); @@ -850,6 +855,7 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase { if (tableType == MERGE_ON_READ) { assertDoesNotThrow(() -> { String compactionTimeStamp = client2.createNewInstantTime(); + ValidationUtils.checkArgument(InstantComparison.compareTimestamps(compactionTimeStamp, InstantComparison.GREATER_THAN, upsertCommitTime)); client2.scheduleTableService(compactionTimeStamp, Option.empty(), TableServiceType.COMPACT); }); }
