This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 1e7c7027d80 [HUDI-9371] Fix conflict resolution handling of inflight
clustering (#13255)
1e7c7027d80 is described below
commit 1e7c7027d800b51bfeecad50ee41d36046b73880
Author: Tim Brown <[email protected]>
AuthorDate: Thu May 15 18:43:38 2025 -0500
[HUDI-9371] Fix conflict resolution handling of inflight clustering (#13255)
---
.../client/transaction/ConcurrentOperation.java | 8 +-
...urrentFileWritesConflictResolutionStrategy.java | 10 +-
.../TestConflictResolutionStrategyUtil.java | 98 ++++----------
...TestPreferWriterConflictResolutionStrategy.java | 2 +-
...urrentFileWritesConflictResolutionStrategy.java | 142 ++++++++++++---------
.../hudi/common/testutils/HoodieTestTable.java | 4 +-
6 files changed, 122 insertions(+), 142 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentOperation.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentOperation.java
index 9580ef26235..28846b10c92 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentOperation.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentOperation.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.MetadataConversionUtils;
+import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
@@ -61,9 +62,10 @@ public class ConcurrentOperation {
private Set<Pair<String, String>> mutatedPartitionAndFileIds =
Collections.emptySet();
public ConcurrentOperation(HoodieInstant instant, HoodieTableMetaClient
metaClient) throws IOException {
- // Replace compaction.inflight to compaction.request since inflight does
not contain compaction plan.
- if (instant.getAction().equals(COMPACTION_ACTION) &&
instant.getState().equals(HoodieInstant.State.INFLIGHT)) {
- instant = metaClient.createNewInstant(HoodieInstant.State.REQUESTED,
COMPACTION_ACTION, instant.requestedTime());
+ // Replace inflight compaction and clustering to requested since inflight
does not contain the plan.
+ if ((instant.getAction().equals(COMPACTION_ACTION) ||
ClusteringUtils.isClusteringInstant(metaClient.getActiveTimeline(), instant,
metaClient.getInstantGenerator()))
+ && instant.getState().equals(HoodieInstant.State.INFLIGHT)) {
+ instant = metaClient.createNewInstant(HoodieInstant.State.REQUESTED,
instant.getAction(), instant.requestedTime());
}
this.metadataWrapper = new
HoodieMetadataWrapper(MetadataConversionUtils.createMetaWrapper(instant,
metaClient));
this.commitMetadataOption = Option.empty();
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleConcurrentFileWritesConflictResolutionStrategy.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleConcurrentFileWritesConflictResolutionStrategy.java
index 62bb867e497..8cbc141e0e8 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleConcurrentFileWritesConflictResolutionStrategy.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleConcurrentFileWritesConflictResolutionStrategy.java
@@ -38,6 +38,7 @@ import java.util.HashSet;
import java.util.Set;
import java.util.stream.Stream;
+import static
org.apache.hudi.common.table.timeline.InstantComparison.GREATER_THAN;
import static
org.apache.hudi.common.table.timeline.InstantComparison.LESSER_THAN;
import static
org.apache.hudi.common.table.timeline.InstantComparison.compareTimestamps;
@@ -55,9 +56,9 @@ public class
SimpleConcurrentFileWritesConflictResolutionStrategy
HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
// To find which instants are conflicting, we apply the following logic
// 1. Get completed instants timeline only for commits that have happened
since the last successful write.
- // 2. Get any scheduled or completed compaction or clustering operations
that have started and/or finished
- // after the current instant. We need to check for write conflicts since
they may have mutated the same files
- // that are being newly created by the current write.
+ // 2. Get any scheduled or completed compaction that have started and/or
finished after the current instant.
+ // 3. Get any completed replace commit that happened since the last
successful write and any pending replace commit.
+ // We need to check for write conflicts since they may have mutated the
same files that are being newly created by the current write.
Stream<HoodieInstant> completedCommitsInstantStream = activeTimeline
.getCommitsTimeline()
.filterCompletedInstants()
@@ -67,8 +68,7 @@ public class
SimpleConcurrentFileWritesConflictResolutionStrategy
Stream<HoodieInstant> compactionAndClusteringPendingTimeline =
activeTimeline
.filterPendingReplaceClusteringAndCompactionTimeline()
.filter(instant -> ClusteringUtils.isClusteringInstant(activeTimeline,
instant, metaClient.getInstantGenerator())
- || HoodieTimeline.COMPACTION_ACTION.equals(instant.getAction()))
- .findInstantsAfter(currentInstant.requestedTime())
+ || (!HoodieTimeline.CLUSTERING_ACTION.equals(instant.getAction())
&& compareTimestamps(instant.requestedTime(), GREATER_THAN,
currentInstant.requestedTime())))
.getInstantsAsStream();
return Stream.concat(completedCommitsInstantStream,
compactionAndClusteringPendingTimeline);
}
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestConflictResolutionStrategyUtil.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestConflictResolutionStrategyUtil.java
index 557716fac3a..32de4ab768a 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestConflictResolutionStrategyUtil.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestConflictResolutionStrategyUtil.java
@@ -36,12 +36,11 @@ import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.util.Option;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import static
org.apache.hudi.common.testutils.HoodieTestUtils.COMMIT_METADATA_SER_DE;
-
public class TestConflictResolutionStrategyUtil {
public static void createCommit(String instantTime, HoodieTableMetaClient
metaClient) throws Exception {
@@ -116,34 +115,48 @@ public class TestConflictResolutionStrategyUtil {
String fileId2 = "file-2";
// create replace instant to mark fileId1 as deleted
+ HoodieRequestedReplaceMetadata requestedReplaceMetadata =
buildRequestedReplaceMetadata(fileId1, WriteOperationType.CLUSTER);
+ HoodieTestTable.of(metaClient)
+ .addRequestedCluster(instantTime, requestedReplaceMetadata)
+
.withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
fileId1, fileId2);
+ }
+
+ static HoodieRequestedReplaceMetadata buildRequestedReplaceMetadata(String
fileId1, WriteOperationType writeOperationType) {
HoodieRequestedReplaceMetadata requestedReplaceMetadata = new
HoodieRequestedReplaceMetadata();
-
requestedReplaceMetadata.setOperationType(WriteOperationType.CLUSTER.name());
+ requestedReplaceMetadata.setOperationType(writeOperationType.name());
HoodieClusteringPlan clusteringPlan = new HoodieClusteringPlan();
HoodieClusteringGroup clusteringGroup = new HoodieClusteringGroup();
HoodieSliceInfo sliceInfo = new HoodieSliceInfo();
sliceInfo.setFileId(fileId1);
sliceInfo.setPartitionPath(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);
- clusteringGroup.setSlices(Arrays.asList(sliceInfo));
- clusteringPlan.setInputGroups(Arrays.asList(clusteringGroup));
+ clusteringGroup.setSlices(Collections.singletonList(sliceInfo));
+ clusteringPlan.setInputGroups(Collections.singletonList(clusteringGroup));
requestedReplaceMetadata.setClusteringPlan(clusteringPlan);
requestedReplaceMetadata.setVersion(TimelineLayoutVersion.CURR_VERSION);
- HoodieTestTable.of(metaClient)
- .addRequestedCluster(instantTime, requestedReplaceMetadata)
-
.withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
fileId1, fileId2);
+ return requestedReplaceMetadata;
}
public static void createClusterInflight(String instantTime,
HoodieTableMetaClient metaClient) throws Exception {
+ HoodieTestTable.of(metaClient).addInflightCluster(instantTime);
+ }
+
+ public static void createReplaceInflight(String instantTime,
HoodieTableMetaClient metaClient) throws Exception {
String fileId1 = "file-1";
String fileId2 = "file-2";
+ HoodieReplaceCommitMetadata inflightReplaceMetadata =
buildReplaceCommitMetadata(WriteOperationType.INSERT_OVERWRITE);
+ HoodieTestTable.of(metaClient)
+ .addInflightReplace(instantTime, Option.of(inflightReplaceMetadata))
+
.withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
fileId1, fileId2);
+ }
+
+ private static HoodieReplaceCommitMetadata
buildReplaceCommitMetadata(WriteOperationType insertOverwrite) {
HoodieReplaceCommitMetadata inflightReplaceMetadata = new
HoodieReplaceCommitMetadata();
-
inflightReplaceMetadata.setOperationType(WriteOperationType.INSERT_OVERWRITE);
+ inflightReplaceMetadata.setOperationType(insertOverwrite);
HoodieWriteStat writeStat = new HoodieWriteStat();
writeStat.setFileId("file-1");
inflightReplaceMetadata.addWriteStat(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
writeStat);
- HoodieTestTable.of(metaClient)
- .addInflightCluster(instantTime, Option.of(inflightReplaceMetadata))
-
.withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
fileId1, fileId2);
+ return inflightReplaceMetadata;
}
public static void createPendingInsertOverwrite(String instantTime,
WriteOperationType writeOperationType, HoodieTableMetaClient metaClient) throws
Exception {
@@ -173,17 +186,7 @@ public class TestConflictResolutionStrategyUtil {
replaceMetadata.addWriteStat(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
writeStat);
replaceMetadata.setOperationType(writeOperationType);
// create replace instant to mark fileId1 as deleted
- HoodieRequestedReplaceMetadata requestedReplaceMetadata = new
HoodieRequestedReplaceMetadata();
-
requestedReplaceMetadata.setOperationType(WriteOperationType.CLUSTER.name());
- HoodieClusteringPlan clusteringPlan = new HoodieClusteringPlan();
- HoodieClusteringGroup clusteringGroup = new HoodieClusteringGroup();
- HoodieSliceInfo sliceInfo = new HoodieSliceInfo();
- sliceInfo.setFileId(fileId1);
-
sliceInfo.setPartitionPath(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);
- clusteringGroup.setSlices(Arrays.asList(sliceInfo));
- clusteringPlan.setInputGroups(Arrays.asList(clusteringGroup));
- requestedReplaceMetadata.setClusteringPlan(clusteringPlan);
- requestedReplaceMetadata.setVersion(TimelineLayoutVersion.CURR_VERSION);
+ HoodieRequestedReplaceMetadata requestedReplaceMetadata =
buildRequestedReplaceMetadata(fileId1, WriteOperationType.INSERT_OVERWRITE);
HoodieTestTable.of(metaClient)
.addReplaceCommit(instantTime, Option.of(requestedReplaceMetadata),
Option.empty(), replaceMetadata)
.withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
fileId1, fileId2);
@@ -203,17 +206,7 @@ public class TestConflictResolutionStrategyUtil {
replaceMetadata.addWriteStat(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
writeStat);
replaceMetadata.setOperationType(writeOperationType);
// create replace instant to mark fileId1 as deleted
- HoodieRequestedReplaceMetadata requestedReplaceMetadata = new
HoodieRequestedReplaceMetadata();
-
requestedReplaceMetadata.setOperationType(WriteOperationType.CLUSTER.name());
- HoodieClusteringPlan clusteringPlan = new HoodieClusteringPlan();
- HoodieClusteringGroup clusteringGroup = new HoodieClusteringGroup();
- HoodieSliceInfo sliceInfo = new HoodieSliceInfo();
- sliceInfo.setFileId(fileId1);
-
sliceInfo.setPartitionPath(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);
- clusteringGroup.setSlices(Arrays.asList(sliceInfo));
- clusteringPlan.setInputGroups(Arrays.asList(clusteringGroup));
- requestedReplaceMetadata.setClusteringPlan(clusteringPlan);
- requestedReplaceMetadata.setVersion(TimelineLayoutVersion.CURR_VERSION);
+ HoodieRequestedReplaceMetadata requestedReplaceMetadata =
buildRequestedReplaceMetadata(fileId1, writeOperationType);
HoodieTestTable.of(metaClient)
.addCluster(instantTime, requestedReplaceMetadata, Option.empty(),
replaceMetadata)
.withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
fileId1, fileId2);
@@ -223,38 +216,12 @@ public class TestConflictResolutionStrategyUtil {
String fileId1 = "file-1";
String fileId2 = "file-2";
// create replace instant to mark fileId2 as deleted
- HoodieRequestedReplaceMetadata requestedReplaceMetadata = new
HoodieRequestedReplaceMetadata();
-
requestedReplaceMetadata.setOperationType(WriteOperationType.CLUSTER.name());
- HoodieClusteringPlan clusteringPlan = new HoodieClusteringPlan();
- HoodieClusteringGroup clusteringGroup = new HoodieClusteringGroup();
- HoodieSliceInfo sliceInfo = new HoodieSliceInfo();
- sliceInfo.setFileId(fileId2);
-
sliceInfo.setPartitionPath(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);
- clusteringGroup.setSlices(Arrays.asList(sliceInfo));
- clusteringPlan.setInputGroups(Arrays.asList(clusteringGroup));
- requestedReplaceMetadata.setClusteringPlan(clusteringPlan);
- requestedReplaceMetadata.setVersion(TimelineLayoutVersion.CURR_VERSION);
+ HoodieRequestedReplaceMetadata requestedReplaceMetadata =
buildRequestedReplaceMetadata(fileId2, writeOperationType);
HoodieTestTable.of(metaClient)
.addPendingCluster(instantTime, requestedReplaceMetadata,
Option.empty())
.withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
fileId1, fileId2);
}
- public static void createCompleteReplace(String instantTime,
WriteOperationType writeOperationType, HoodieTableMetaClient metaClient) throws
Exception {
- String fileId1 = "file-1";
- String fileId2 = "file-2";
-
- // create replace instant to mark fileId2 as deleted
- HoodieReplaceCommitMetadata replaceMetadata = new
HoodieReplaceCommitMetadata();
- Map<String, List<String>> partitionFileIds = new HashMap<>();
- partitionFileIds.put(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
Arrays.asList(fileId2));
- replaceMetadata.setPartitionToReplaceFileIds(partitionFileIds);
- HoodieWriteStat writeStat = new HoodieWriteStat();
- writeStat.setFileId("file-2");
-
replaceMetadata.addWriteStat(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
writeStat);
- replaceMetadata.setOperationType(writeOperationType);
- FileCreateUtilsLegacy.createReplaceCommit(COMMIT_METADATA_SER_DE,
metaClient.getBasePath().toString(), instantTime, replaceMetadata);
- }
-
public static void createPendingCompaction(String instantTime,
HoodieTableMetaClient metaClient) throws Exception {
String fileId1 = "file-1";
HoodieCompactionPlan compactionPlan = new HoodieCompactionPlan();
@@ -306,15 +273,6 @@ public class TestConflictResolutionStrategyUtil {
.withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
fileId1, fileId2);
}
- public static void createClusterInflight(String instantTime,
WriteOperationType writeOperationType, HoodieTableMetaClient metaClient) throws
Exception {
- Option<HoodieReplaceCommitMetadata> inflightReplaceMetadata =
Option.empty();
- if (WriteOperationType.INSERT_OVERWRITE.equals(writeOperationType)) {
- inflightReplaceMetadata =
Option.of(createReplaceCommitMetadata(WriteOperationType.INSERT_OVERWRITE));
- }
- HoodieTestTable.of(metaClient)
- .addInflightCluster(instantTime, inflightReplaceMetadata);
- }
-
private static HoodieReplaceCommitMetadata
createReplaceCommitMetadata(WriteOperationType writeOperationType) {
String fileId1 = "file-1";
HoodieReplaceCommitMetadata replaceMetadata = new
HoodieReplaceCommitMetadata();
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestPreferWriterConflictResolutionStrategy.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestPreferWriterConflictResolutionStrategy.java
index 4d68ad8d6b5..f1cc3dfc51e 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestPreferWriterConflictResolutionStrategy.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestPreferWriterConflictResolutionStrategy.java
@@ -174,7 +174,7 @@ public class TestPreferWriterConflictResolutionStrategy
extends HoodieCommonTest
// clustering 1 gets scheduled
String newInstantTime = metaClient.createNewInstantTime();
createClusterRequested(newInstantTime, metaClient);
- createClusterInflight(newInstantTime, WriteOperationType.CLUSTER,
metaClient);
+ createClusterInflight(newInstantTime, metaClient);
Option<HoodieInstant> currentInstant =
Option.of(INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT,
HoodieTimeline.COMMIT_ACTION, currentWriterInstant));
PreferWriterConflictResolutionStrategy strategy = new
PreferWriterConflictResolutionStrategy();
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestSimpleConcurrentFileWritesConflictResolutionStrategy.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestSimpleConcurrentFileWritesConflictResolutionStrategy.java
index 9488fd8d797..a749fce98f4 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestSimpleConcurrentFileWritesConflictResolutionStrategy.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestSimpleConcurrentFileWritesConflictResolutionStrategy.java
@@ -40,8 +40,9 @@ import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import static
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
+import static
org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.buildRequestedReplaceMetadata;
import static
org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createCluster;
+import static
org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createClusterInflight;
import static
org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createClusterRequested;
import static
org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createCommit;
import static
org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createCommitMetadata;
@@ -50,11 +51,12 @@ import static
org.apache.hudi.client.transaction.TestConflictResolutionStrategyU
import static
org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createCompleteCommit;
import static
org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createCompleteCompaction;
import static
org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createInflightCommit;
-import static
org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createPendingCompaction;
import static
org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createPendingCluster;
+import static
org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createPendingCompaction;
import static
org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createPendingInsertOverwrite;
import static
org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createReplace;
import static
org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createRequestedCommit;
+import static
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
public class TestSimpleConcurrentFileWritesConflictResolutionStrategy extends
HoodieCommonTestHarness {
@@ -115,32 +117,50 @@ public class
TestSimpleConcurrentFileWritesConflictResolutionStrategy extends Ho
List<HoodieInstant> candidateInstants =
strategy.getCandidateInstants(metaClient, currentInstant.get(),
lastSuccessfulInstant).collect(
Collectors.toList());
// writer 1 conflicts with writer 2
- Assertions.assertTrue(candidateInstants.size() == 1);
+ Assertions.assertEquals(1, candidateInstants.size());
ConcurrentOperation thatCommitOperation = new
ConcurrentOperation(candidateInstants.get(0), metaClient);
ConcurrentOperation thisCommitOperation = new
ConcurrentOperation(currentInstant.get(), currentMetadata);
Assertions.assertTrue(strategy.hasConflict(thisCommitOperation,
thatCommitOperation));
- try {
- strategy.resolveConflict(null, thisCommitOperation, thatCommitOperation);
- Assertions.fail("Cannot reach here, writer 1 and writer 2 should have
thrown a conflict");
- } catch (HoodieWriteConflictException e) {
- // expected
- }
+ Assertions.assertThrows(HoodieWriteConflictException.class, () ->
strategy.resolveConflict(null, thisCommitOperation, thatCommitOperation));
}
@Test
public void testConcurrentWritesWithReplaceInflightCommit() throws Exception
{
-
TestConflictResolutionStrategyUtil.createClusterInflight(metaClient.createNewInstantTime(),
metaClient);
- HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
+ String currentWriterInstant = metaClient.createNewInstantTime();
+ createInflightCommit(currentWriterInstant, metaClient);
+ Option<HoodieInstant> currentInstant =
Option.of(INSTANT_GENERATOR.createNewInstant(State.INFLIGHT,
HoodieTimeline.COMMIT_ACTION, currentWriterInstant));
+
+ String replaceInstant = metaClient.createNewInstantTime();
+ HoodieTestTable.of(metaClient).addRequestedReplace(replaceInstant,
Option.empty());
+ TestConflictResolutionStrategyUtil.createReplaceInflight(replaceInstant,
metaClient);
Option<HoodieInstant> lastSuccessfulInstant = Option.empty();
+ SimpleConcurrentFileWritesConflictResolutionStrategy strategy = new
SimpleConcurrentFileWritesConflictResolutionStrategy();
+ HoodieCommitMetadata currentMetadata =
createCommitMetadata(currentWriterInstant);
+ metaClient.reloadActiveTimeline();
+
+ List<HoodieInstant> candidateInstants =
strategy.getCandidateInstants(metaClient, currentInstant.get(),
lastSuccessfulInstant).collect(
+ Collectors.toList());
+
+ // writer 1 conflicts with writer 2
+ Assertions.assertEquals(1, candidateInstants.size());
+ ConcurrentOperation thatCommitOperation = new
ConcurrentOperation(candidateInstants.get(0), metaClient);
+ ConcurrentOperation thisCommitOperation = new
ConcurrentOperation(currentInstant.get(), currentMetadata);
+ Assertions.assertTrue(strategy.hasConflict(thisCommitOperation,
thatCommitOperation));
+ Assertions.assertThrows(HoodieWriteConflictException.class, () ->
strategy.resolveConflict(null, thisCommitOperation, thatCommitOperation));
+ }
+
+ @Test
+ public void testConcurrentWritesWithClusteringInflightCommit() throws
Exception {
// writer 1 starts
String currentWriterInstant = metaClient.createNewInstantTime();
createInflightCommit(currentWriterInstant, metaClient);
Option<HoodieInstant> currentInstant =
Option.of(INSTANT_GENERATOR.createNewInstant(State.INFLIGHT,
HoodieTimeline.COMMIT_ACTION, currentWriterInstant));
- // writer 2 starts and finishes
- String newInstantTime = metaClient.createNewInstantTime();
- TestConflictResolutionStrategyUtil.createClusterInflight(newInstantTime,
metaClient);
+ String clusteringInstantTime = metaClient.createNewInstantTime();
+ createClusterRequested(clusteringInstantTime, metaClient);
+ Option<HoodieInstant> lastSuccessfulInstant = Option.empty();
+ createClusterInflight(clusteringInstantTime, metaClient);
SimpleConcurrentFileWritesConflictResolutionStrategy strategy = new
SimpleConcurrentFileWritesConflictResolutionStrategy();
HoodieCommitMetadata currentMetadata =
createCommitMetadata(currentWriterInstant);
@@ -154,12 +174,35 @@ public class
TestSimpleConcurrentFileWritesConflictResolutionStrategy extends Ho
ConcurrentOperation thatCommitOperation = new
ConcurrentOperation(candidateInstants.get(0), metaClient);
ConcurrentOperation thisCommitOperation = new
ConcurrentOperation(currentInstant.get(), currentMetadata);
Assertions.assertTrue(strategy.hasConflict(thisCommitOperation,
thatCommitOperation));
- try {
- strategy.resolveConflict(null, thisCommitOperation, thatCommitOperation);
- Assertions.fail("Cannot reach here, writer 1 and writer 2 should have
thrown a conflict");
- } catch (HoodieWriteConflictException e) {
- // expected
- }
+ Assertions.assertThrows(HoodieWriteConflictException.class, () ->
strategy.resolveConflict(null, thisCommitOperation, thatCommitOperation));
+ }
+
+ @Test
+ public void testConcurrentWritesWithLegacyClusteringInflightCommit() throws
Exception {
+ String clusteringInstantTime = metaClient.createNewInstantTime();
+ // create a replace commit with a clustering operation to mimic a commit
written by a v6 writer
+ HoodieTestTable.of(metaClient).addRequestedReplace(clusteringInstantTime,
Option.of(buildRequestedReplaceMetadata("file-1", WriteOperationType.CLUSTER)));
+ Option<HoodieInstant> lastSuccessfulInstant = Option.empty();
+ HoodieTestTable.of(metaClient).addInflightReplace(clusteringInstantTime,
Option.empty());
+
+ // writer 1 starts
+ String currentWriterInstant = metaClient.createNewInstantTime();
+ createInflightCommit(currentWriterInstant, metaClient);
+ Option<HoodieInstant> currentInstant =
Option.of(INSTANT_GENERATOR.createNewInstant(State.INFLIGHT,
HoodieTimeline.COMMIT_ACTION, currentWriterInstant));
+
+ SimpleConcurrentFileWritesConflictResolutionStrategy strategy = new
SimpleConcurrentFileWritesConflictResolutionStrategy();
+ HoodieCommitMetadata currentMetadata =
createCommitMetadata(currentWriterInstant);
+ metaClient.reloadActiveTimeline();
+
+ List<HoodieInstant> candidateInstants =
strategy.getCandidateInstants(metaClient, currentInstant.get(),
lastSuccessfulInstant).collect(
+ Collectors.toList());
+
+ // writer 1 conflicts with writer 2
+ Assertions.assertEquals(1, candidateInstants.size());
+ ConcurrentOperation thatCommitOperation = new
ConcurrentOperation(candidateInstants.get(0), metaClient);
+ ConcurrentOperation thisCommitOperation = new
ConcurrentOperation(currentInstant.get(), currentMetadata);
+ Assertions.assertTrue(strategy.hasConflict(thisCommitOperation,
thatCommitOperation));
+ Assertions.assertThrows(HoodieWriteConflictException.class, () ->
strategy.resolveConflict(null, thisCommitOperation, thatCommitOperation));
}
@Test
@@ -182,16 +225,11 @@ public class
TestSimpleConcurrentFileWritesConflictResolutionStrategy extends Ho
List<HoodieInstant> candidateInstants =
strategy.getCandidateInstants(metaClient, currentInstant.get(),
lastSuccessfulInstant).collect(
Collectors.toList());
// writer 1 conflicts with scheduled compaction plan 1
- Assertions.assertTrue(candidateInstants.size() == 1);
+ Assertions.assertEquals(1, candidateInstants.size());
ConcurrentOperation thatCommitOperation = new
ConcurrentOperation(candidateInstants.get(0), metaClient);
ConcurrentOperation thisCommitOperation = new
ConcurrentOperation(currentInstant.get(), currentMetadata);
Assertions.assertTrue(strategy.hasConflict(thisCommitOperation,
thatCommitOperation));
- try {
- strategy.resolveConflict(null, thisCommitOperation, thatCommitOperation);
- Assertions.fail("Cannot reach here, should have thrown a conflict");
- } catch (HoodieWriteConflictException e) {
- // expected
- }
+ Assertions.assertThrows(HoodieWriteConflictException.class, () ->
strategy.resolveConflict(null, thisCommitOperation, thatCommitOperation));
}
@Test
@@ -214,16 +252,11 @@ public class
TestSimpleConcurrentFileWritesConflictResolutionStrategy extends Ho
List<HoodieInstant> candidateInstants =
strategy.getCandidateInstants(metaClient, currentInstant.get(),
lastSuccessfulInstant).collect(
Collectors.toList());
// writer 1 conflicts with compaction 1
- Assertions.assertTrue(candidateInstants.size() == 1);
+ Assertions.assertEquals(1, candidateInstants.size());
ConcurrentOperation thatCommitOperation = new
ConcurrentOperation(candidateInstants.get(0), metaClient);
ConcurrentOperation thisCommitOperation = new
ConcurrentOperation(currentInstant.get(), currentMetadata);
Assertions.assertTrue(strategy.hasConflict(thisCommitOperation,
thatCommitOperation));
- try {
- strategy.resolveConflict(null, thisCommitOperation, thatCommitOperation);
- Assertions.fail("Cannot reach here, should have thrown a conflict");
- } catch (HoodieWriteConflictException e) {
- // expected
- }
+ Assertions.assertThrows(HoodieWriteConflictException.class, () ->
strategy.resolveConflict(null, thisCommitOperation, thatCommitOperation));
}
/**
@@ -244,7 +277,6 @@ public class
TestSimpleConcurrentFileWritesConflictResolutionStrategy extends Ho
Option<HoodieInstant> currentInstant =
Option.of(INSTANT_GENERATOR.createNewInstant(State.INFLIGHT,
HoodieTimeline.COMMIT_ACTION, currentWriterInstant));
SimpleConcurrentFileWritesConflictResolutionStrategy strategy = new
SimpleConcurrentFileWritesConflictResolutionStrategy();
- HoodieCommitMetadata currentMetadata =
createCommitMetadata(currentWriterInstant);
metaClient.reloadActiveTimeline();
List<HoodieInstant> candidateInstants =
strategy.getCandidateInstants(metaClient, currentInstant.get(),
lastSuccessfulInstant).collect(
Collectors.toList());
@@ -276,12 +308,7 @@ public class
TestSimpleConcurrentFileWritesConflictResolutionStrategy extends Ho
ConcurrentOperation thatCommitOperation = new
ConcurrentOperation(candidateInstants.get(0), metaClient);
ConcurrentOperation thisCommitOperation = new
ConcurrentOperation(currentInstant.get(), currentMetadata);
Assertions.assertTrue(strategy.hasConflict(thisCommitOperation,
thatCommitOperation));
- try {
- strategy.resolveConflict(null, thisCommitOperation, thatCommitOperation);
- Assertions.fail("Cannot reach here, should have thrown a conflict");
- } catch (HoodieWriteConflictException e) {
- // expected
- }
+ Assertions.assertThrows(HoodieWriteConflictException.class, () ->
strategy.resolveConflict(null, thisCommitOperation, thatCommitOperation));
}
@Test
@@ -304,16 +331,11 @@ public class
TestSimpleConcurrentFileWritesConflictResolutionStrategy extends Ho
List<HoodieInstant> candidateInstants =
strategy.getCandidateInstants(metaClient, currentInstant.get(),
lastSuccessfulInstant).collect(
Collectors.toList());
// writer 1 conflicts with cluster 1
- Assertions.assertTrue(candidateInstants.size() == 1);
+ Assertions.assertEquals(1, candidateInstants.size());
ConcurrentOperation thatCommitOperation = new
ConcurrentOperation(candidateInstants.get(0), metaClient);
ConcurrentOperation thisCommitOperation = new
ConcurrentOperation(currentInstant.get(), currentMetadata);
Assertions.assertTrue(strategy.hasConflict(thisCommitOperation,
thatCommitOperation));
- try {
- strategy.resolveConflict(null, thisCommitOperation, thatCommitOperation);
- Assertions.fail("Cannot reach here, should have thrown a conflict");
- } catch (HoodieWriteConflictException e) {
- // expected
- }
+ Assertions.assertThrows(HoodieWriteConflictException.class, () ->
strategy.resolveConflict(null, thisCommitOperation, thatCommitOperation));
}
@Test
@@ -336,20 +358,15 @@ public class
TestSimpleConcurrentFileWritesConflictResolutionStrategy extends Ho
List<HoodieInstant> candidateInstants =
strategy.getCandidateInstants(metaClient, currentInstant.get(),
lastSuccessfulInstant).collect(
Collectors.toList());
// writer 1 conflicts with replace 1
- Assertions.assertTrue(candidateInstants.size() == 1);
+ Assertions.assertEquals(1, candidateInstants.size());
ConcurrentOperation thatCommitOperation = new
ConcurrentOperation(candidateInstants.get(0), metaClient);
ConcurrentOperation thisCommitOperation = new
ConcurrentOperation(currentInstant.get(), currentMetadata);
Assertions.assertTrue(strategy.hasConflict(thisCommitOperation,
thatCommitOperation));
- try {
- strategy.resolveConflict(null, thisCommitOperation, thatCommitOperation);
- Assertions.fail("Cannot reach here, should have thrown a conflict");
- } catch (HoodieWriteConflictException e) {
- // expected
- }
+ Assertions.assertThrows(HoodieWriteConflictException.class, () ->
strategy.resolveConflict(null, thisCommitOperation, thatCommitOperation));
}
@Test
- public void tstConcurrentWritesWithPendingInsertOverwriteReplace() throws
Exception {
+ public void testConcurrentWritesWithPendingInsertOverwriteReplace() throws
Exception {
createCommit(metaClient.createNewInstantTime(), metaClient);
HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
// consider commits before this are all successful
@@ -370,7 +387,10 @@ public class
TestSimpleConcurrentFileWritesConflictResolutionStrategy extends Ho
List<HoodieInstant> candidateInstants =
strategy.getCandidateInstants(metaClient, currentInstant.get(),
lastSuccessfulInstant).collect(
Collectors.toList());
// writer 1 will not conflicts with insert_overwrite 1
- Assertions.assertTrue(candidateInstants.size() == 0);
+ Assertions.assertEquals(1, candidateInstants.size());
+ ConcurrentOperation thatCommitOperation = new
ConcurrentOperation(candidateInstants.get(0), metaClient);
+ ConcurrentOperation thisCommitOperation = new
ConcurrentOperation(currentInstant.get(), currentMetadata);
+ Assertions.assertFalse(strategy.hasConflict(thisCommitOperation,
thatCommitOperation));
}
// try to simulate HUDI-3355
@@ -422,10 +442,10 @@ public class
TestSimpleConcurrentFileWritesConflictResolutionStrategy extends Ho
for (HoodieInstant instant : completedInstantsDuringCurrentWriteOperation)
{
ConcurrentOperation thatCommitOperation = new
ConcurrentOperation(instant, metaClient);
Assertions.assertTrue(strategy.hasConflict(thisCommitOperation,
thatCommitOperation));
- try {
- strategy.resolveConflict(null, thisCommitOperation,
thatCommitOperation);
- } catch (HoodieWriteConflictException e) {
- // expected
+ if (instant.requestedTime().equals(newCompactionInstantTimeC11)) {
+ Assertions.assertDoesNotThrow(() -> strategy.resolveConflict(null,
thisCommitOperation, thatCommitOperation));
+ } else {
+ Assertions.assertThrows(HoodieWriteConflictException.class, () ->
strategy.resolveConflict(null, thisCommitOperation, thatCommitOperation));
}
}
}
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
index 1032c97d784..c61af9329a6 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
@@ -376,8 +376,8 @@ public class HoodieTestTable implements AutoCloseable {
return this;
}
- public HoodieTestTable addInflightCluster(String instantTime,
Option<HoodieReplaceCommitMetadata> inflightReplaceMetadata) throws Exception {
- createInflightClusterCommit(metaClient,
metaClient.getCommitMetadataSerDe(), instantTime, inflightReplaceMetadata);
+ public HoodieTestTable addInflightCluster(String instantTime) throws
Exception {
+ createInflightClusterCommit(metaClient,
metaClient.getCommitMetadataSerDe(), instantTime, Option.empty());
currentInstantTime = instantTime;
return this;
}