This is an automated email from the ASF dual-hosted git repository.

zhangyue19921010 pushed a commit to branch partition-bucket-index
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit cc8f397f6b52c8d2cf9c77ba90aa75e34d618096
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Tue Mar 18 17:26:03 2025 -0700

    [HUDI-9083] Fixing flakiness with multi writer test (#12987)
---
 .../org/apache/hudi/client/TestHoodieClientMultiWriter.java    | 10 ++++++++--
 1 file changed, 8 insertions(+), 2 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
index a55a52f5bf9..f2b76780f46 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
@@ -44,6 +44,7 @@ import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.table.marker.MarkerType;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.InstantComparison;
 import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
 import org.apache.hudi.common.table.view.FileSystemViewStorageType;
 import org.apache.hudi.common.testutils.HoodieTestTable;
@@ -51,6 +52,7 @@ import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.common.util.CommitUtils;
 import org.apache.hudi.common.util.FileIOUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.config.HoodieArchivalConfig;
 import org.apache.hudi.config.HoodieCleanConfig;
 import org.apache.hudi.config.HoodieClusteringConfig;
@@ -812,6 +814,9 @@ public class TestHoodieClientMultiWriter extends 
HoodieClientTestBase {
     final SparkRDDWriteClient client1 = getHoodieWriteClient(cfg2);
     final SparkRDDWriteClient client2 = getHoodieWriteClient(cfg);
     final SparkRDDWriteClient client3 = getHoodieWriteClient(cfg);
+    final String upsertCommitTime = client1.createNewInstantTime(); // upsert 
commit time has to be lesser than compaction instant time.
+    // and w/ MOR table, during conflict resolution, we will definitely hit 
conflict resolution exception.
+    // if the delta commit's instant time is not guaranteed to be < compaction 
instant time, then delta commit will succeed w/o issues.
 
     // Test with concurrent operations could be flaky, to reduce possibility 
of wrong ordering some queue is added
     // For InProcessLockProvider we could wait less
@@ -820,7 +825,6 @@ public class TestHoodieClientMultiWriter extends 
HoodieClientTestBase {
 
     // Create upserts, schedule cleaning, schedule compaction in parallel
     Future future1 = executors.submit(() -> {
-      final String newCommitTime = client1.createNewInstantTime();
       final int numRecords = 100;
       final String commitTimeBetweenPrevAndNew = secondCommitTime;
 
@@ -834,11 +838,12 @@ public class TestHoodieClientMultiWriter extends 
HoodieClientTestBase {
         // Since the concurrent modifications went in, this upsert has
         // to fail
         assertThrows(HoodieWriteConflictException.class, () -> {
-          createCommitWithUpserts(cfg, client1, thirdCommitTime, 
Option.of(commitTimeBetweenPrevAndNew), newCommitTime, numRecords);
+          createCommitWithUpserts(cfg, client1, thirdCommitTime, 
Option.of(commitTimeBetweenPrevAndNew), upsertCommitTime, numRecords);
         });
       } else {
         // We don't have the compaction for COW and so this upsert
         // has to pass
+        final String newCommitTime = client1.createNewInstantTime();
         assertDoesNotThrow(() -> {
           createCommitWithUpserts(cfg, client1, thirdCommitTime, 
Option.of(commitTimeBetweenPrevAndNew), newCommitTime, numRecords);
         });
@@ -850,6 +855,7 @@ public class TestHoodieClientMultiWriter extends 
HoodieClientTestBase {
       if (tableType == MERGE_ON_READ) {
         assertDoesNotThrow(() -> {
           String compactionTimeStamp = client2.createNewInstantTime();
+          
ValidationUtils.checkArgument(InstantComparison.compareTimestamps(compactionTimeStamp,
 InstantComparison.GREATER_THAN, upsertCommitTime));
           client2.scheduleTableService(compactionTimeStamp, Option.empty(), 
TableServiceType.COMPACT);
         });
       }

Reply via email to