This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e7c7027d80 [HUDI-9371] Fix conflict resolution handling of inflight 
clustering (#13255)
1e7c7027d80 is described below

commit 1e7c7027d800b51bfeecad50ee41d36046b73880
Author: Tim Brown <[email protected]>
AuthorDate: Thu May 15 18:43:38 2025 -0500

    [HUDI-9371] Fix conflict resolution handling of inflight clustering (#13255)
---
 .../client/transaction/ConcurrentOperation.java    |   8 +-
 ...urrentFileWritesConflictResolutionStrategy.java |  10 +-
 .../TestConflictResolutionStrategyUtil.java        |  98 ++++----------
 ...TestPreferWriterConflictResolutionStrategy.java |   2 +-
 ...urrentFileWritesConflictResolutionStrategy.java | 142 ++++++++++++---------
 .../hudi/common/testutils/HoodieTestTable.java     |   4 +-
 6 files changed, 122 insertions(+), 142 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentOperation.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentOperation.java
index 9580ef26235..28846b10c92 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentOperation.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentOperation.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.MetadataConversionUtils;
+import org.apache.hudi.common.util.ClusteringUtils;
 import org.apache.hudi.common.util.CommitUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
@@ -61,9 +62,10 @@ public class ConcurrentOperation {
   private Set<Pair<String, String>> mutatedPartitionAndFileIds = 
Collections.emptySet();
 
   public ConcurrentOperation(HoodieInstant instant, HoodieTableMetaClient 
metaClient) throws IOException {
-    // Replace compaction.inflight to compaction.request since inflight does 
not contain compaction plan.
-    if (instant.getAction().equals(COMPACTION_ACTION) && 
instant.getState().equals(HoodieInstant.State.INFLIGHT)) {
-      instant = metaClient.createNewInstant(HoodieInstant.State.REQUESTED, 
COMPACTION_ACTION, instant.requestedTime());
+    // Replace inflight compaction and clustering to requested since inflight 
does not contain the plan.
+    if ((instant.getAction().equals(COMPACTION_ACTION) || 
ClusteringUtils.isClusteringInstant(metaClient.getActiveTimeline(), instant, 
metaClient.getInstantGenerator()))
+        && instant.getState().equals(HoodieInstant.State.INFLIGHT)) {
+      instant = metaClient.createNewInstant(HoodieInstant.State.REQUESTED, 
instant.getAction(), instant.requestedTime());
     }
     this.metadataWrapper = new 
HoodieMetadataWrapper(MetadataConversionUtils.createMetaWrapper(instant, 
metaClient));
     this.commitMetadataOption = Option.empty();
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleConcurrentFileWritesConflictResolutionStrategy.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleConcurrentFileWritesConflictResolutionStrategy.java
index 62bb867e497..8cbc141e0e8 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleConcurrentFileWritesConflictResolutionStrategy.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleConcurrentFileWritesConflictResolutionStrategy.java
@@ -38,6 +38,7 @@ import java.util.HashSet;
 import java.util.Set;
 import java.util.stream.Stream;
 
+import static 
org.apache.hudi.common.table.timeline.InstantComparison.GREATER_THAN;
 import static 
org.apache.hudi.common.table.timeline.InstantComparison.LESSER_THAN;
 import static 
org.apache.hudi.common.table.timeline.InstantComparison.compareTimestamps;
 
@@ -55,9 +56,9 @@ public class 
SimpleConcurrentFileWritesConflictResolutionStrategy
     HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
     // To find which instants are conflicting, we apply the following logic
     // 1. Get completed instants timeline only for commits that have happened 
since the last successful write.
-    // 2. Get any scheduled or completed compaction or clustering operations 
that have started and/or finished
-    // after the current instant. We need to check for write conflicts since 
they may have mutated the same files
-    // that are being newly created by the current write.
+    // 2. Get any scheduled or completed compaction that have started and/or 
finished after the current instant.
+    // 3. Get any completed replace commit that happened since the last 
successful write and any pending replace commit.
+    // We need to check for write conflicts since they may have mutated the 
same files that are being newly created by the current write.
     Stream<HoodieInstant> completedCommitsInstantStream = activeTimeline
         .getCommitsTimeline()
         .filterCompletedInstants()
@@ -67,8 +68,7 @@ public class 
SimpleConcurrentFileWritesConflictResolutionStrategy
     Stream<HoodieInstant> compactionAndClusteringPendingTimeline = 
activeTimeline
         .filterPendingReplaceClusteringAndCompactionTimeline()
         .filter(instant -> ClusteringUtils.isClusteringInstant(activeTimeline, 
instant, metaClient.getInstantGenerator())
-            || HoodieTimeline.COMPACTION_ACTION.equals(instant.getAction()))
-        .findInstantsAfter(currentInstant.requestedTime())
+            || (!HoodieTimeline.CLUSTERING_ACTION.equals(instant.getAction()) 
&& compareTimestamps(instant.requestedTime(), GREATER_THAN, 
currentInstant.requestedTime())))
         .getInstantsAsStream();
     return Stream.concat(completedCommitsInstantStream, 
compactionAndClusteringPendingTimeline);
   }
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestConflictResolutionStrategyUtil.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestConflictResolutionStrategyUtil.java
index 557716fac3a..32de4ab768a 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestConflictResolutionStrategyUtil.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestConflictResolutionStrategyUtil.java
@@ -36,12 +36,11 @@ import org.apache.hudi.common.testutils.HoodieTestTable;
 import org.apache.hudi.common.util.Option;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import static 
org.apache.hudi.common.testutils.HoodieTestUtils.COMMIT_METADATA_SER_DE;
-
 public class TestConflictResolutionStrategyUtil {
 
   public static void createCommit(String instantTime, HoodieTableMetaClient 
metaClient) throws Exception {
@@ -116,34 +115,48 @@ public class TestConflictResolutionStrategyUtil {
     String fileId2 = "file-2";
 
     // create replace instant to mark fileId1 as deleted
+    HoodieRequestedReplaceMetadata requestedReplaceMetadata = 
buildRequestedReplaceMetadata(fileId1, WriteOperationType.CLUSTER);
+    HoodieTestTable.of(metaClient)
+        .addRequestedCluster(instantTime, requestedReplaceMetadata)
+        
.withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, 
fileId1, fileId2);
+  }
+
+  static HoodieRequestedReplaceMetadata buildRequestedReplaceMetadata(String 
fileId1, WriteOperationType writeOperationType) {
     HoodieRequestedReplaceMetadata requestedReplaceMetadata = new 
HoodieRequestedReplaceMetadata();
-    
requestedReplaceMetadata.setOperationType(WriteOperationType.CLUSTER.name());
+    requestedReplaceMetadata.setOperationType(writeOperationType.name());
     HoodieClusteringPlan clusteringPlan = new HoodieClusteringPlan();
     HoodieClusteringGroup clusteringGroup = new HoodieClusteringGroup();
     HoodieSliceInfo sliceInfo = new HoodieSliceInfo();
     sliceInfo.setFileId(fileId1);
     
sliceInfo.setPartitionPath(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);
-    clusteringGroup.setSlices(Arrays.asList(sliceInfo));
-    clusteringPlan.setInputGroups(Arrays.asList(clusteringGroup));
+    clusteringGroup.setSlices(Collections.singletonList(sliceInfo));
+    clusteringPlan.setInputGroups(Collections.singletonList(clusteringGroup));
     requestedReplaceMetadata.setClusteringPlan(clusteringPlan);
     requestedReplaceMetadata.setVersion(TimelineLayoutVersion.CURR_VERSION);
-    HoodieTestTable.of(metaClient)
-        .addRequestedCluster(instantTime, requestedReplaceMetadata)
-        
.withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, 
fileId1, fileId2);
+    return requestedReplaceMetadata;
   }
 
   public static void createClusterInflight(String instantTime, 
HoodieTableMetaClient metaClient) throws Exception {
+    HoodieTestTable.of(metaClient).addInflightCluster(instantTime);
+  }
+
+  public static void createReplaceInflight(String instantTime, 
HoodieTableMetaClient metaClient) throws Exception {
     String fileId1 = "file-1";
     String fileId2 = "file-2";
 
+    HoodieReplaceCommitMetadata inflightReplaceMetadata = 
buildReplaceCommitMetadata(WriteOperationType.INSERT_OVERWRITE);
+    HoodieTestTable.of(metaClient)
+        .addInflightReplace(instantTime, Option.of(inflightReplaceMetadata))
+        
.withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, 
fileId1, fileId2);
+  }
+
+  private static HoodieReplaceCommitMetadata 
buildReplaceCommitMetadata(WriteOperationType insertOverwrite) {
     HoodieReplaceCommitMetadata inflightReplaceMetadata = new 
HoodieReplaceCommitMetadata();
-    
inflightReplaceMetadata.setOperationType(WriteOperationType.INSERT_OVERWRITE);
+    inflightReplaceMetadata.setOperationType(insertOverwrite);
     HoodieWriteStat writeStat = new HoodieWriteStat();
     writeStat.setFileId("file-1");
     
inflightReplaceMetadata.addWriteStat(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
 writeStat);
-    HoodieTestTable.of(metaClient)
-        .addInflightCluster(instantTime, Option.of(inflightReplaceMetadata))
-        
.withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, 
fileId1, fileId2);
+    return inflightReplaceMetadata;
   }
 
   public static void createPendingInsertOverwrite(String instantTime, 
WriteOperationType writeOperationType, HoodieTableMetaClient metaClient) throws 
Exception {
@@ -173,17 +186,7 @@ public class TestConflictResolutionStrategyUtil {
     
replaceMetadata.addWriteStat(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
 writeStat);
     replaceMetadata.setOperationType(writeOperationType);
     // create replace instant to mark fileId1 as deleted
-    HoodieRequestedReplaceMetadata requestedReplaceMetadata = new 
HoodieRequestedReplaceMetadata();
-    
requestedReplaceMetadata.setOperationType(WriteOperationType.CLUSTER.name());
-    HoodieClusteringPlan clusteringPlan = new HoodieClusteringPlan();
-    HoodieClusteringGroup clusteringGroup = new HoodieClusteringGroup();
-    HoodieSliceInfo sliceInfo = new HoodieSliceInfo();
-    sliceInfo.setFileId(fileId1);
-    
sliceInfo.setPartitionPath(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);
-    clusteringGroup.setSlices(Arrays.asList(sliceInfo));
-    clusteringPlan.setInputGroups(Arrays.asList(clusteringGroup));
-    requestedReplaceMetadata.setClusteringPlan(clusteringPlan);
-    requestedReplaceMetadata.setVersion(TimelineLayoutVersion.CURR_VERSION);
+    HoodieRequestedReplaceMetadata requestedReplaceMetadata = 
buildRequestedReplaceMetadata(fileId1, WriteOperationType.INSERT_OVERWRITE);
     HoodieTestTable.of(metaClient)
         .addReplaceCommit(instantTime, Option.of(requestedReplaceMetadata), 
Option.empty(), replaceMetadata)
         
.withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, 
fileId1, fileId2);
@@ -203,17 +206,7 @@ public class TestConflictResolutionStrategyUtil {
     
replaceMetadata.addWriteStat(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
 writeStat);
     replaceMetadata.setOperationType(writeOperationType);
     // create replace instant to mark fileId1 as deleted
-    HoodieRequestedReplaceMetadata requestedReplaceMetadata = new 
HoodieRequestedReplaceMetadata();
-    
requestedReplaceMetadata.setOperationType(WriteOperationType.CLUSTER.name());
-    HoodieClusteringPlan clusteringPlan = new HoodieClusteringPlan();
-    HoodieClusteringGroup clusteringGroup = new HoodieClusteringGroup();
-    HoodieSliceInfo sliceInfo = new HoodieSliceInfo();
-    sliceInfo.setFileId(fileId1);
-    
sliceInfo.setPartitionPath(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);
-    clusteringGroup.setSlices(Arrays.asList(sliceInfo));
-    clusteringPlan.setInputGroups(Arrays.asList(clusteringGroup));
-    requestedReplaceMetadata.setClusteringPlan(clusteringPlan);
-    requestedReplaceMetadata.setVersion(TimelineLayoutVersion.CURR_VERSION);
+    HoodieRequestedReplaceMetadata requestedReplaceMetadata = 
buildRequestedReplaceMetadata(fileId1, writeOperationType);
     HoodieTestTable.of(metaClient)
         .addCluster(instantTime, requestedReplaceMetadata, Option.empty(), 
replaceMetadata)
         
.withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, 
fileId1, fileId2);
@@ -223,38 +216,12 @@ public class TestConflictResolutionStrategyUtil {
     String fileId1 = "file-1";
     String fileId2 = "file-2";
     // create replace instant to mark fileId2 as deleted
-    HoodieRequestedReplaceMetadata requestedReplaceMetadata = new 
HoodieRequestedReplaceMetadata();
-    
requestedReplaceMetadata.setOperationType(WriteOperationType.CLUSTER.name());
-    HoodieClusteringPlan clusteringPlan = new HoodieClusteringPlan();
-    HoodieClusteringGroup clusteringGroup = new HoodieClusteringGroup();
-    HoodieSliceInfo sliceInfo = new HoodieSliceInfo();
-    sliceInfo.setFileId(fileId2);
-    
sliceInfo.setPartitionPath(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);
-    clusteringGroup.setSlices(Arrays.asList(sliceInfo));
-    clusteringPlan.setInputGroups(Arrays.asList(clusteringGroup));
-    requestedReplaceMetadata.setClusteringPlan(clusteringPlan);
-    requestedReplaceMetadata.setVersion(TimelineLayoutVersion.CURR_VERSION);
+    HoodieRequestedReplaceMetadata requestedReplaceMetadata = 
buildRequestedReplaceMetadata(fileId2, writeOperationType);
     HoodieTestTable.of(metaClient)
         .addPendingCluster(instantTime, requestedReplaceMetadata, 
Option.empty())
         
.withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, 
fileId1, fileId2);
   }
 
-  public static void createCompleteReplace(String instantTime, 
WriteOperationType writeOperationType, HoodieTableMetaClient metaClient) throws 
Exception {
-    String fileId1 = "file-1";
-    String fileId2 = "file-2";
-
-    // create replace instant to mark fileId2 as deleted
-    HoodieReplaceCommitMetadata replaceMetadata = new 
HoodieReplaceCommitMetadata();
-    Map<String, List<String>> partitionFileIds = new HashMap<>();
-    partitionFileIds.put(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, 
Arrays.asList(fileId2));
-    replaceMetadata.setPartitionToReplaceFileIds(partitionFileIds);
-    HoodieWriteStat writeStat = new HoodieWriteStat();
-    writeStat.setFileId("file-2");
-    
replaceMetadata.addWriteStat(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
 writeStat);
-    replaceMetadata.setOperationType(writeOperationType);
-    FileCreateUtilsLegacy.createReplaceCommit(COMMIT_METADATA_SER_DE, 
metaClient.getBasePath().toString(), instantTime, replaceMetadata);
-  }
-
   public static void createPendingCompaction(String instantTime, 
HoodieTableMetaClient metaClient) throws Exception {
     String fileId1 = "file-1";
     HoodieCompactionPlan compactionPlan = new HoodieCompactionPlan();
@@ -306,15 +273,6 @@ public class TestConflictResolutionStrategyUtil {
         
.withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, 
fileId1, fileId2);
   }
 
-  public static void createClusterInflight(String instantTime, 
WriteOperationType writeOperationType, HoodieTableMetaClient metaClient) throws 
Exception {
-    Option<HoodieReplaceCommitMetadata> inflightReplaceMetadata = 
Option.empty();
-    if (WriteOperationType.INSERT_OVERWRITE.equals(writeOperationType)) {
-      inflightReplaceMetadata = 
Option.of(createReplaceCommitMetadata(WriteOperationType.INSERT_OVERWRITE));
-    }
-    HoodieTestTable.of(metaClient)
-        .addInflightCluster(instantTime, inflightReplaceMetadata);
-  }
-
   private static HoodieReplaceCommitMetadata 
createReplaceCommitMetadata(WriteOperationType writeOperationType) {
     String fileId1 = "file-1";
     HoodieReplaceCommitMetadata replaceMetadata = new 
HoodieReplaceCommitMetadata();
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestPreferWriterConflictResolutionStrategy.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestPreferWriterConflictResolutionStrategy.java
index 4d68ad8d6b5..f1cc3dfc51e 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestPreferWriterConflictResolutionStrategy.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestPreferWriterConflictResolutionStrategy.java
@@ -174,7 +174,7 @@ public class TestPreferWriterConflictResolutionStrategy 
extends HoodieCommonTest
     // clustering 1 gets scheduled
     String newInstantTime = metaClient.createNewInstantTime();
     createClusterRequested(newInstantTime, metaClient);
-    createClusterInflight(newInstantTime, WriteOperationType.CLUSTER, 
metaClient);
+    createClusterInflight(newInstantTime, metaClient);
 
     Option<HoodieInstant> currentInstant = 
Option.of(INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT, 
HoodieTimeline.COMMIT_ACTION, currentWriterInstant));
     PreferWriterConflictResolutionStrategy strategy = new 
PreferWriterConflictResolutionStrategy();
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestSimpleConcurrentFileWritesConflictResolutionStrategy.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestSimpleConcurrentFileWritesConflictResolutionStrategy.java
index 9488fd8d797..a749fce98f4 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestSimpleConcurrentFileWritesConflictResolutionStrategy.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestSimpleConcurrentFileWritesConflictResolutionStrategy.java
@@ -40,8 +40,9 @@ import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import static 
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
+import static 
org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.buildRequestedReplaceMetadata;
 import static 
org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createCluster;
+import static 
org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createClusterInflight;
 import static 
org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createClusterRequested;
 import static 
org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createCommit;
 import static 
org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createCommitMetadata;
@@ -50,11 +51,12 @@ import static 
org.apache.hudi.client.transaction.TestConflictResolutionStrategyU
 import static 
org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createCompleteCommit;
 import static 
org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createCompleteCompaction;
 import static 
org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createInflightCommit;
-import static 
org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createPendingCompaction;
 import static 
org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createPendingCluster;
+import static 
org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createPendingCompaction;
 import static 
org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createPendingInsertOverwrite;
 import static 
org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createReplace;
 import static 
org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createRequestedCommit;
+import static 
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
 
 public class TestSimpleConcurrentFileWritesConflictResolutionStrategy extends 
HoodieCommonTestHarness {
 
@@ -115,32 +117,50 @@ public class 
TestSimpleConcurrentFileWritesConflictResolutionStrategy extends Ho
     List<HoodieInstant> candidateInstants = 
strategy.getCandidateInstants(metaClient, currentInstant.get(), 
lastSuccessfulInstant).collect(
         Collectors.toList());
     // writer 1 conflicts with writer 2
-    Assertions.assertTrue(candidateInstants.size() == 1);
+    Assertions.assertEquals(1, candidateInstants.size());
     ConcurrentOperation thatCommitOperation = new 
ConcurrentOperation(candidateInstants.get(0), metaClient);
     ConcurrentOperation thisCommitOperation = new 
ConcurrentOperation(currentInstant.get(), currentMetadata);
     Assertions.assertTrue(strategy.hasConflict(thisCommitOperation, 
thatCommitOperation));
-    try {
-      strategy.resolveConflict(null, thisCommitOperation, thatCommitOperation);
-      Assertions.fail("Cannot reach here, writer 1 and writer 2 should have 
thrown a conflict");
-    } catch (HoodieWriteConflictException e) {
-      // expected
-    }
+    Assertions.assertThrows(HoodieWriteConflictException.class, () -> 
strategy.resolveConflict(null, thisCommitOperation, thatCommitOperation));
   }
 
   @Test
   public void testConcurrentWritesWithReplaceInflightCommit() throws Exception 
{
-    
TestConflictResolutionStrategyUtil.createClusterInflight(metaClient.createNewInstantTime(),
 metaClient);
-    HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
+    String currentWriterInstant = metaClient.createNewInstantTime();
+    createInflightCommit(currentWriterInstant, metaClient);
+    Option<HoodieInstant> currentInstant = 
Option.of(INSTANT_GENERATOR.createNewInstant(State.INFLIGHT, 
HoodieTimeline.COMMIT_ACTION, currentWriterInstant));
+
+    String replaceInstant = metaClient.createNewInstantTime();
+    HoodieTestTable.of(metaClient).addRequestedReplace(replaceInstant, 
Option.empty());
+    TestConflictResolutionStrategyUtil.createReplaceInflight(replaceInstant, 
metaClient);
     Option<HoodieInstant> lastSuccessfulInstant = Option.empty();
 
+    SimpleConcurrentFileWritesConflictResolutionStrategy strategy = new 
SimpleConcurrentFileWritesConflictResolutionStrategy();
+    HoodieCommitMetadata currentMetadata = 
createCommitMetadata(currentWriterInstant);
+    metaClient.reloadActiveTimeline();
+
+    List<HoodieInstant> candidateInstants = 
strategy.getCandidateInstants(metaClient, currentInstant.get(), 
lastSuccessfulInstant).collect(
+        Collectors.toList());
+
+    // writer 1 conflicts with writer 2
+    Assertions.assertEquals(1, candidateInstants.size());
+    ConcurrentOperation thatCommitOperation = new 
ConcurrentOperation(candidateInstants.get(0), metaClient);
+    ConcurrentOperation thisCommitOperation = new 
ConcurrentOperation(currentInstant.get(), currentMetadata);
+    Assertions.assertTrue(strategy.hasConflict(thisCommitOperation, 
thatCommitOperation));
+    Assertions.assertThrows(HoodieWriteConflictException.class, () -> 
strategy.resolveConflict(null, thisCommitOperation, thatCommitOperation));
+  }
+
+  @Test
+  public void testConcurrentWritesWithClusteringInflightCommit() throws 
Exception {
     // writer 1 starts
     String currentWriterInstant = metaClient.createNewInstantTime();
     createInflightCommit(currentWriterInstant, metaClient);
     Option<HoodieInstant> currentInstant = 
Option.of(INSTANT_GENERATOR.createNewInstant(State.INFLIGHT, 
HoodieTimeline.COMMIT_ACTION, currentWriterInstant));
 
-    // writer 2 starts and finishes
-    String newInstantTime = metaClient.createNewInstantTime();
-    TestConflictResolutionStrategyUtil.createClusterInflight(newInstantTime, 
metaClient);
+    String clusteringInstantTime = metaClient.createNewInstantTime();
+    createClusterRequested(clusteringInstantTime, metaClient);
+    Option<HoodieInstant> lastSuccessfulInstant = Option.empty();
+    createClusterInflight(clusteringInstantTime, metaClient);
 
     SimpleConcurrentFileWritesConflictResolutionStrategy strategy = new 
SimpleConcurrentFileWritesConflictResolutionStrategy();
     HoodieCommitMetadata currentMetadata = 
createCommitMetadata(currentWriterInstant);
@@ -154,12 +174,35 @@ public class 
TestSimpleConcurrentFileWritesConflictResolutionStrategy extends Ho
     ConcurrentOperation thatCommitOperation = new 
ConcurrentOperation(candidateInstants.get(0), metaClient);
     ConcurrentOperation thisCommitOperation = new 
ConcurrentOperation(currentInstant.get(), currentMetadata);
     Assertions.assertTrue(strategy.hasConflict(thisCommitOperation, 
thatCommitOperation));
-    try {
-      strategy.resolveConflict(null, thisCommitOperation, thatCommitOperation);
-      Assertions.fail("Cannot reach here, writer 1 and writer 2 should have 
thrown a conflict");
-    } catch (HoodieWriteConflictException e) {
-      // expected
-    }
+    Assertions.assertThrows(HoodieWriteConflictException.class, () -> 
strategy.resolveConflict(null, thisCommitOperation, thatCommitOperation));
+  }
+
+  @Test
+  public void testConcurrentWritesWithLegacyClusteringInflightCommit() throws 
Exception {
+    String clusteringInstantTime = metaClient.createNewInstantTime();
+    // create a replace commit with a clustering operation to mimic a commit 
written by a v6 writer
+    HoodieTestTable.of(metaClient).addRequestedReplace(clusteringInstantTime, 
Option.of(buildRequestedReplaceMetadata("file-1", WriteOperationType.CLUSTER)));
+    Option<HoodieInstant> lastSuccessfulInstant = Option.empty();
+    HoodieTestTable.of(metaClient).addInflightReplace(clusteringInstantTime, 
Option.empty());
+
+    // writer 1 starts
+    String currentWriterInstant = metaClient.createNewInstantTime();
+    createInflightCommit(currentWriterInstant, metaClient);
+    Option<HoodieInstant> currentInstant = 
Option.of(INSTANT_GENERATOR.createNewInstant(State.INFLIGHT, 
HoodieTimeline.COMMIT_ACTION, currentWriterInstant));
+
+    SimpleConcurrentFileWritesConflictResolutionStrategy strategy = new 
SimpleConcurrentFileWritesConflictResolutionStrategy();
+    HoodieCommitMetadata currentMetadata = 
createCommitMetadata(currentWriterInstant);
+    metaClient.reloadActiveTimeline();
+
+    List<HoodieInstant> candidateInstants = 
strategy.getCandidateInstants(metaClient, currentInstant.get(), 
lastSuccessfulInstant).collect(
+        Collectors.toList());
+
+    // writer 1 conflicts with writer 2
+    Assertions.assertEquals(1, candidateInstants.size());
+    ConcurrentOperation thatCommitOperation = new 
ConcurrentOperation(candidateInstants.get(0), metaClient);
+    ConcurrentOperation thisCommitOperation = new 
ConcurrentOperation(currentInstant.get(), currentMetadata);
+    Assertions.assertTrue(strategy.hasConflict(thisCommitOperation, 
thatCommitOperation));
+    Assertions.assertThrows(HoodieWriteConflictException.class, () -> 
strategy.resolveConflict(null, thisCommitOperation, thatCommitOperation));
   }
 
   @Test
@@ -182,16 +225,11 @@ public class 
TestSimpleConcurrentFileWritesConflictResolutionStrategy extends Ho
     List<HoodieInstant> candidateInstants = 
strategy.getCandidateInstants(metaClient, currentInstant.get(), 
lastSuccessfulInstant).collect(
         Collectors.toList());
     // writer 1 conflicts with scheduled compaction plan 1
-    Assertions.assertTrue(candidateInstants.size() == 1);
+    Assertions.assertEquals(1, candidateInstants.size());
     ConcurrentOperation thatCommitOperation = new 
ConcurrentOperation(candidateInstants.get(0), metaClient);
     ConcurrentOperation thisCommitOperation = new 
ConcurrentOperation(currentInstant.get(), currentMetadata);
     Assertions.assertTrue(strategy.hasConflict(thisCommitOperation, 
thatCommitOperation));
-    try {
-      strategy.resolveConflict(null, thisCommitOperation, thatCommitOperation);
-      Assertions.fail("Cannot reach here, should have thrown a conflict");
-    } catch (HoodieWriteConflictException e) {
-      // expected
-    }
+    Assertions.assertThrows(HoodieWriteConflictException.class, () -> 
strategy.resolveConflict(null, thisCommitOperation, thatCommitOperation));
   }
 
   @Test
@@ -214,16 +252,11 @@ public class 
TestSimpleConcurrentFileWritesConflictResolutionStrategy extends Ho
     List<HoodieInstant> candidateInstants = 
strategy.getCandidateInstants(metaClient, currentInstant.get(), 
lastSuccessfulInstant).collect(
         Collectors.toList());
     // writer 1 conflicts with compaction 1
-    Assertions.assertTrue(candidateInstants.size() == 1);
+    Assertions.assertEquals(1, candidateInstants.size());
     ConcurrentOperation thatCommitOperation = new 
ConcurrentOperation(candidateInstants.get(0), metaClient);
     ConcurrentOperation thisCommitOperation = new 
ConcurrentOperation(currentInstant.get(), currentMetadata);
     Assertions.assertTrue(strategy.hasConflict(thisCommitOperation, 
thatCommitOperation));
-    try {
-      strategy.resolveConflict(null, thisCommitOperation, thatCommitOperation);
-      Assertions.fail("Cannot reach here, should have thrown a conflict");
-    } catch (HoodieWriteConflictException e) {
-      // expected
-    }
+    Assertions.assertThrows(HoodieWriteConflictException.class, () -> 
strategy.resolveConflict(null, thisCommitOperation, thatCommitOperation));
   }
 
   /**
@@ -244,7 +277,6 @@ public class 
TestSimpleConcurrentFileWritesConflictResolutionStrategy extends Ho
 
     Option<HoodieInstant> currentInstant = 
Option.of(INSTANT_GENERATOR.createNewInstant(State.INFLIGHT, 
HoodieTimeline.COMMIT_ACTION, currentWriterInstant));
     SimpleConcurrentFileWritesConflictResolutionStrategy strategy = new 
SimpleConcurrentFileWritesConflictResolutionStrategy();
-    HoodieCommitMetadata currentMetadata = 
createCommitMetadata(currentWriterInstant);
     metaClient.reloadActiveTimeline();
     List<HoodieInstant> candidateInstants = 
strategy.getCandidateInstants(metaClient, currentInstant.get(), 
lastSuccessfulInstant).collect(
         Collectors.toList());
@@ -276,12 +308,7 @@ public class 
TestSimpleConcurrentFileWritesConflictResolutionStrategy extends Ho
     ConcurrentOperation thatCommitOperation = new 
ConcurrentOperation(candidateInstants.get(0), metaClient);
     ConcurrentOperation thisCommitOperation = new 
ConcurrentOperation(currentInstant.get(), currentMetadata);
     Assertions.assertTrue(strategy.hasConflict(thisCommitOperation, 
thatCommitOperation));
-    try {
-      strategy.resolveConflict(null, thisCommitOperation, thatCommitOperation);
-      Assertions.fail("Cannot reach here, should have thrown a conflict");
-    } catch (HoodieWriteConflictException e) {
-      // expected
-    }
+    Assertions.assertThrows(HoodieWriteConflictException.class, () -> 
strategy.resolveConflict(null, thisCommitOperation, thatCommitOperation));
   }
 
   @Test
@@ -304,16 +331,11 @@ public class 
TestSimpleConcurrentFileWritesConflictResolutionStrategy extends Ho
     List<HoodieInstant> candidateInstants = 
strategy.getCandidateInstants(metaClient, currentInstant.get(), 
lastSuccessfulInstant).collect(
         Collectors.toList());
     // writer 1 conflicts with cluster 1
-    Assertions.assertTrue(candidateInstants.size() == 1);
+    Assertions.assertEquals(1, candidateInstants.size());
     ConcurrentOperation thatCommitOperation = new 
ConcurrentOperation(candidateInstants.get(0), metaClient);
     ConcurrentOperation thisCommitOperation = new 
ConcurrentOperation(currentInstant.get(), currentMetadata);
     Assertions.assertTrue(strategy.hasConflict(thisCommitOperation, 
thatCommitOperation));
-    try {
-      strategy.resolveConflict(null, thisCommitOperation, thatCommitOperation);
-      Assertions.fail("Cannot reach here, should have thrown a conflict");
-    } catch (HoodieWriteConflictException e) {
-      // expected
-    }
+    Assertions.assertThrows(HoodieWriteConflictException.class, () -> 
strategy.resolveConflict(null, thisCommitOperation, thatCommitOperation));
   }
 
   @Test
@@ -336,20 +358,15 @@ public class 
TestSimpleConcurrentFileWritesConflictResolutionStrategy extends Ho
     List<HoodieInstant> candidateInstants = 
strategy.getCandidateInstants(metaClient, currentInstant.get(), 
lastSuccessfulInstant).collect(
         Collectors.toList());
     // writer 1 conflicts with replace 1
-    Assertions.assertTrue(candidateInstants.size() == 1);
+    Assertions.assertEquals(1, candidateInstants.size());
     ConcurrentOperation thatCommitOperation = new 
ConcurrentOperation(candidateInstants.get(0), metaClient);
     ConcurrentOperation thisCommitOperation = new 
ConcurrentOperation(currentInstant.get(), currentMetadata);
     Assertions.assertTrue(strategy.hasConflict(thisCommitOperation, 
thatCommitOperation));
-    try {
-      strategy.resolveConflict(null, thisCommitOperation, thatCommitOperation);
-      Assertions.fail("Cannot reach here, should have thrown a conflict");
-    } catch (HoodieWriteConflictException e) {
-      // expected
-    }
+    Assertions.assertThrows(HoodieWriteConflictException.class, () -> 
strategy.resolveConflict(null, thisCommitOperation, thatCommitOperation));
   }
 
   @Test
-  public void tstConcurrentWritesWithPendingInsertOverwriteReplace() throws 
Exception {
+  public void testConcurrentWritesWithPendingInsertOverwriteReplace() throws 
Exception {
     createCommit(metaClient.createNewInstantTime(), metaClient);
     HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
     // consider commits before this are all successful
@@ -370,7 +387,10 @@ public class 
TestSimpleConcurrentFileWritesConflictResolutionStrategy extends Ho
     List<HoodieInstant> candidateInstants = 
strategy.getCandidateInstants(metaClient, currentInstant.get(), 
lastSuccessfulInstant).collect(
         Collectors.toList());
     // writer 1 will not conflicts with insert_overwrite 1
-    Assertions.assertTrue(candidateInstants.size() == 0);
+    Assertions.assertEquals(1, candidateInstants.size());
+    ConcurrentOperation thatCommitOperation = new 
ConcurrentOperation(candidateInstants.get(0), metaClient);
+    ConcurrentOperation thisCommitOperation = new 
ConcurrentOperation(currentInstant.get(), currentMetadata);
+    Assertions.assertFalse(strategy.hasConflict(thisCommitOperation, 
thatCommitOperation));
   }
 
   // try to simulate HUDI-3355
@@ -422,10 +442,10 @@ public class 
TestSimpleConcurrentFileWritesConflictResolutionStrategy extends Ho
     for (HoodieInstant instant : completedInstantsDuringCurrentWriteOperation) 
{
       ConcurrentOperation thatCommitOperation = new 
ConcurrentOperation(instant, metaClient);
       Assertions.assertTrue(strategy.hasConflict(thisCommitOperation, 
thatCommitOperation));
-      try {
-        strategy.resolveConflict(null, thisCommitOperation, 
thatCommitOperation);
-      } catch (HoodieWriteConflictException e) {
-        // expected
+      if (instant.requestedTime().equals(newCompactionInstantTimeC11)) {
+        Assertions.assertDoesNotThrow(() -> strategy.resolveConflict(null, 
thisCommitOperation, thatCommitOperation));
+      } else {
+        Assertions.assertThrows(HoodieWriteConflictException.class, () -> 
strategy.resolveConflict(null, thisCommitOperation, thatCommitOperation));
       }
     }
   }
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
index 1032c97d784..c61af9329a6 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
@@ -376,8 +376,8 @@ public class HoodieTestTable implements AutoCloseable {
     return this;
   }
 
-  public HoodieTestTable addInflightCluster(String instantTime, 
Option<HoodieReplaceCommitMetadata> inflightReplaceMetadata) throws Exception {
-    createInflightClusterCommit(metaClient, 
metaClient.getCommitMetadataSerDe(), instantTime, inflightReplaceMetadata);
+  public HoodieTestTable addInflightCluster(String instantTime) throws 
Exception {
+    createInflightClusterCommit(metaClient, 
metaClient.getCommitMetadataSerDe(), instantTime, Option.empty());
     currentInstantTime = instantTime;
     return this;
   }


Reply via email to