This is an automated email from the ASF dual-hosted git repository.
pwason pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 26b324f267e5 fix(concurrency): detect rollback conflicts with ongoing
commit operations (#18089)
26b324f267e5 is described below
commit 26b324f267e507399bb5a0d8157a450ccb797d1a
Author: Prashant Wason <[email protected]>
AuthorDate: Thu Mar 19 19:04:45 2026 -0700
fix(concurrency): detect rollback conflicts with ongoing commit operations
(#18089)
---
.../client/transaction/ConcurrentOperation.java | 28 +++++-
.../PreferWriterConflictResolutionStrategy.java | 15 ++-
...urrentFileWritesConflictResolutionStrategy.java | 40 +++++++-
.../TestConflictResolutionStrategyUtil.java | 19 ++++
...TestPreferWriterConflictResolutionStrategy.java | 106 +++++++++++++++++++++
5 files changed, 204 insertions(+), 4 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentOperation.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentOperation.java
index 177119a789ab..80c7df2aca3e 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentOperation.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentOperation.java
@@ -19,6 +19,7 @@
package org.apache.hudi.client.transaction;
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackPlan;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieMetadataWrapper;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
@@ -47,6 +48,7 @@ import static
org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_AC
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.LOG_COMPACTION_ACTION;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.ROLLBACK_ACTION;
import static
org.apache.hudi.common.util.CommitUtils.getPartitionAndFileIdWithoutSuffixFromSpecificRecord;
/**
@@ -68,8 +70,11 @@ public class ConcurrentOperation {
private final String actionType;
@ToString.Include
private final String instantTime;
+ private final HoodieTableMetaClient metaClient;
@Getter
private Set<Pair<String, String>> mutatedPartitionAndFileIds =
Collections.emptySet();
+ @Getter
+ private String rolledbackCommit;
public ConcurrentOperation(HoodieInstant instant, HoodieTableMetaClient
metaClient) throws IOException {
// Replace inflight compaction and clustering to requested since inflight
does not contain the plan.
@@ -82,6 +87,7 @@ public class ConcurrentOperation {
this.actionState = instant.getState().name();
this.actionType = instant.getAction();
this.instantTime = instant.requestedTime();
+ this.metaClient = metaClient; // used only by the other concurrent
operation (which reads from timeline)
init(instant);
}
@@ -91,7 +97,13 @@ public class ConcurrentOperation {
this.actionState = instant.getState().name();
this.actionType = instant.getAction();
this.instantTime = instant.requestedTime();
- init(instant);
+ this.metaClient = null; // used only by the other concurrent operation
(which reads from timeline)
+ try {
+ init(instant);
+ } catch (IOException e) {
+ // This should never happen since we are initializing with commit
metadata
+ throw new RuntimeException("Failed to initialize ConcurrentOperation for
instant: " + instant, e);
+ }
}
public String getInstantActionState() {
@@ -106,7 +118,7 @@ public class ConcurrentOperation {
return instantTime;
}
- private void init(HoodieInstant instant) {
+ private void init(HoodieInstant instant) throws IOException {
if (this.metadataWrapper.isAvroMetadata()) {
switch (getInstantActionType()) {
case COMPACTION_ACTION:
@@ -128,6 +140,18 @@ public class ConcurrentOperation {
this.operationType =
WriteOperationType.fromValue(avroCommitMeta.getOperationType());
}
break;
+ case ROLLBACK_ACTION:
+ this.operationType = WriteOperationType.UNKNOWN;
+ if (!instant.isCompleted()) {
+ // requested rollback instants have rollback plan in the details;
(inflight rollback is empty).
+ // irrespective of requested/inflight, always read rollback plan.
+ if (this.metaClient != null) {
+ HoodieInstant requested =
metaClient.getInstantGenerator().getRollbackRequestedInstant(instant);
+ HoodieRollbackPlan rollbackPlan =
metaClient.getActiveTimeline().readRollbackPlan(requested);
+ this.rolledbackCommit =
rollbackPlan.getInstantToRollback().getCommitTime();
+ }
+ }
+ break;
case REPLACE_COMMIT_ACTION:
case CLUSTERING_ACTION:
if (instant.isCompleted()) {
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/PreferWriterConflictResolutionStrategy.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/PreferWriterConflictResolutionStrategy.java
index 2c3942f7169e..36a6d5a81ef8 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/PreferWriterConflictResolutionStrategy.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/PreferWriterConflictResolutionStrategy.java
@@ -72,12 +72,25 @@ public class PreferWriterConflictResolutionStrategy
&& writeConfigOpt.isPresent() &&
writeConfigOpt.get().isClusteringBlockForPendingIngestion();
if (isCurrentOperationClustering ||
COMPACTION_ACTION.equals(currentInstant.getAction())) {
+ // Table service rollbacks are done by table service jobs/writers only,
not by ingestion threads,
+ // so rollback conflict detection is not needed for table services.
return getCandidateInstantsForTableServicesCommits(activeTimeline,
currentInstant, isCurrentOperationClustering, metaClient, writeConfigOpt);
} else {
- return getCandidateInstantsForNonTableServicesCommits(activeTimeline,
currentInstant);
+ return
Stream.concat(getCandidateInstantsForNonTableServicesCommits(activeTimeline,
currentInstant),
+ getCandidateInstantsForRollbackConflict(activeTimeline,
currentInstant));
}
}
+ private Stream<HoodieInstant>
getCandidateInstantsForRollbackConflict(HoodieActiveTimeline activeTimeline,
HoodieInstant currentInstant) {
+ // Add Requested rollback action instants that were created after the
current instant.
+ List<HoodieInstant> pendingRollbacks = activeTimeline
+ .findInstantsAfter(currentInstant.requestedTime())
+ .filterPendingRollbackTimeline()
+ .getInstantsAsStream().collect(Collectors.toList());
+ log.info(String.format("Rollback instants that may have conflict with %s
are %s", currentInstant, pendingRollbacks));
+ return pendingRollbacks.stream();
+ }
+
private Stream<HoodieInstant>
getCandidateInstantsForNonTableServicesCommits(HoodieActiveTimeline
activeTimeline, HoodieInstant currentInstant) {
// To find out which instants are conflicting, we apply the following logic
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleConcurrentFileWritesConflictResolutionStrategy.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleConcurrentFileWritesConflictResolutionStrategy.java
index 9445499ed8a9..e2eaa5310303 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleConcurrentFileWritesConflictResolutionStrategy.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleConcurrentFileWritesConflictResolutionStrategy.java
@@ -40,6 +40,7 @@ import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Stream;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.ROLLBACK_ACTION;
import static
org.apache.hudi.common.table.timeline.InstantComparison.GREATER_THAN;
import static
org.apache.hudi.common.table.timeline.InstantComparison.LESSER_THAN;
import static
org.apache.hudi.common.table.timeline.InstantComparison.compareTimestamps;
@@ -130,6 +131,11 @@ public class
SimpleConcurrentFileWritesConflictResolutionStrategy
@Override
public boolean hasConflict(ConcurrentOperation thisOperation,
ConcurrentOperation otherOperation) {
+ // Check for rollback conflicts first
+ if (isRollbackConflict(thisOperation, otherOperation)) {
+ return true;
+ }
+
// TODO : UUID's can clash even for insert/insert, handle that case.
Set<Pair<String, String>> partitionAndFileIdsSetForFirstInstant =
thisOperation.getMutatedPartitionAndFileIds();
Set<Pair<String, String>> partitionAndFileIdsSetForSecondInstant =
otherOperation.getMutatedPartitionAndFileIds();
@@ -143,6 +149,38 @@ public class
SimpleConcurrentFileWritesConflictResolutionStrategy
return false;
}
+ /**
+ * Check whether there is a rollback operation in progress that tries to
rollback the commit created by this
+ * operation.
+ *
+ * @param thisOperation first concurrent commit operation
+ * @param otherOperation concurrent rollback operation
+ * @return true if there is a rollback conflict, false otherwise
+ */
+ private boolean isRollbackConflict(ConcurrentOperation thisOperation,
ConcurrentOperation otherOperation) {
+ // Check if otherOperation is rollback
+ if (isRollbackOperation(otherOperation)) {
+ String rolledbackCommit = otherOperation.getRolledbackCommit();
+ String thisCommitTimestamp = thisOperation.getInstantTimestamp();
+ if (rolledbackCommit != null &&
rolledbackCommit.equals(thisCommitTimestamp)) {
+ log.error("Found rollback conflict: rollback operation " +
otherOperation
+ + " is rolling back commit " + thisCommitTimestamp + " created by
operation " + thisOperation);
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Check if the given operation is a rollback operation.
+ *
+ * @param operation concurrent operation to check
+ * @return true if it's a rollback operation, false otherwise
+ */
+ private boolean isRollbackOperation(ConcurrentOperation operation) {
+ return ROLLBACK_ACTION.equals(operation.getInstantActionType());
+ }
+
@Override
public Option<HoodieCommitMetadata> resolveConflict(HoodieTable table,
ConcurrentOperation thisOperation, ConcurrentOperation otherOperation) {
@@ -163,7 +201,7 @@ public class
SimpleConcurrentFileWritesConflictResolutionStrategy
// Conflict arises only if the log compaction commit has a lesser
timestamp compared to compaction commit.
return thisOperation.getCommitMetadataOption();
}
- // just abort the current write if conflicts are found
+ // just abort the current write if conflicts are found (failed for
rollback conflicts).
throw new HoodieWriteConflictException(new
ConcurrentModificationException("Cannot resolve conflicts for overlapping
writes between first operation = " + thisOperation
+ ", second operation = " + otherOperation));
}
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestConflictResolutionStrategyUtil.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestConflictResolutionStrategyUtil.java
index ea7ddac7aa4b..fd6237b7e9ca 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestConflictResolutionStrategyUtil.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestConflictResolutionStrategyUtil.java
@@ -22,7 +22,9 @@ import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.avro.model.HoodieCompactionOperation;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.avro.model.HoodieInstantInfo;
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackPlan;
import org.apache.hudi.avro.model.HoodieSliceInfo;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
@@ -296,4 +298,21 @@ public class TestConflictResolutionStrategyUtil {
replaceMetadata.setOperationType(writeOperationType);
return replaceMetadata;
}
+
+ public static void createRollbackRequested(String rollbackInstantTime,
String commitToRollback, HoodieTableMetaClient metaClient) throws Exception {
+ // Create a rollback plan that targets the specified commit
+ HoodieRollbackPlan rollbackPlan = new HoodieRollbackPlan();
+ rollbackPlan.setInstantToRollback(new HoodieInstantInfo(commitToRollback,
"commit"));
+ rollbackPlan.setVersion(TimelineLayoutVersion.CURR_VERSION);
+
+ HoodieTestTable.of(metaClient).addRequestedRollback(rollbackInstantTime,
rollbackPlan);
+ }
+
+ public static void createRollbackInflight(String rollbackInstantTime, String
commitToRollback, HoodieTableMetaClient metaClient) throws Exception {
+ // First create the requested rollback, then transition to inflight
+ createRollbackRequested(rollbackInstantTime, commitToRollback, metaClient);
+
+ // Create the inflight rollback file
+ HoodieTestTable.of(metaClient).addInflightRollback(rollbackInstantTime);
+ }
}
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestPreferWriterConflictResolutionStrategy.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestPreferWriterConflictResolutionStrategy.java
index f59490c77aef..99a39c79d805 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestPreferWriterConflictResolutionStrategy.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestPreferWriterConflictResolutionStrategy.java
@@ -25,6 +25,7 @@ import
org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieInstant.State;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
import org.apache.hudi.common.testutils.HoodieTestTable;
@@ -35,6 +36,8 @@ import org.apache.hudi.exception.HoodieWriteConflictException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
import java.util.List;
@@ -49,6 +52,8 @@ import static
org.apache.hudi.client.transaction.TestConflictResolutionStrategyU
import static
org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createReplace;
import static
org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createClusterInflight;
import static
org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createClusterRequested;
+import static
org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createRollbackInflight;
+import static
org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createRollbackRequested;
import static
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
public class TestPreferWriterConflictResolutionStrategy extends
HoodieCommonTestHarness {
@@ -560,4 +565,105 @@ public class TestPreferWriterConflictResolutionStrategy
extends HoodieCommonTest
Assertions.assertEquals(currentWriterInstant,
candidateInstants.get(0).requestedTime());
}
+ /**
+ * Positive testcase, ensures that conflict is flagged for an on-going
rollback that is targetting the inflight commit.
+ * @param rollbackRequestedOnly - if true, cretes .rollback.requested only,
otherwise creates .rollback.inflight
+ * @throws Exception
+ */
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testConcurrentRollbackAndCommitConflict(boolean
rollbackRequestedOnly) throws Exception {
+ // Create a base commit that the rollback will target
+ String targetCommitTime = WriteClientTestUtils.createNewInstantTime();
+ createCommit(targetCommitTime, metaClient);
+ HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
+
+ // Consider commits before this are all successful
+ Option<HoodieInstant> lastSuccessfulInstant =
timeline.getCommitsTimeline().filterCompletedInstants().lastInstant();
+
+ // Start a new commit (inflight ingestion commit)
+ String inflightCommitTime = WriteClientTestUtils.createNewInstantTime();
+ createInflightCommit(inflightCommitTime, metaClient);
+
+ // Start a rollback operation targeting the same commit timestamp as the
inflight commit
+ String rollbackInstantTime = WriteClientTestUtils.createNewInstantTime();
+ if (rollbackRequestedOnly) {
+ createRollbackRequested(rollbackInstantTime, inflightCommitTime,
metaClient);
+ } else {
+ createRollbackInflight(rollbackInstantTime, inflightCommitTime,
metaClient);
+ }
+
+ // Set up the conflict resolution strategy
+ Option<HoodieInstant> currentInstant =
Option.of(INSTANT_GENERATOR.createNewInstant(State.INFLIGHT,
HoodieTimeline.COMMIT_ACTION, inflightCommitTime));
+ SimpleConcurrentFileWritesConflictResolutionStrategy strategy = new
PreferWriterConflictResolutionStrategy();
+ HoodieCommitMetadata currentMetadata =
createCommitMetadata(inflightCommitTime);
+
+ metaClient.reloadActiveTimeline();
+ List<HoodieInstant> candidateInstants =
strategy.getCandidateInstants(metaClient, currentInstant.get(),
lastSuccessfulInstant).collect(
+ Collectors.toList());
+
+ // The rollback operation should be detected as a candidate instant
+ Assertions.assertTrue(candidateInstants.size() == 1);
+ ConcurrentOperation rollbackOperation = new
ConcurrentOperation(candidateInstants.get(0), metaClient);
+ ConcurrentOperation commitOperation = new
ConcurrentOperation(currentInstant.get(), currentMetadata);
+
+ // The strategy should detect a conflict between the rollback and commit
operations
+ Assertions.assertTrue(strategy.hasConflict(commitOperation,
rollbackOperation));
+
+ // Attempting to resolve the conflict should throw an exception
+ try {
+ strategy.resolveConflict(null, commitOperation, rollbackOperation);
+ Assertions.fail("Cannot reach here, rollback and commit should have
thrown a conflict");
+ } catch (HoodieWriteConflictException e) {
+ // expected
+ }
+ }
+
+ /**
+ * Negative testcase, ensures that conflict is not flagged for an on-going
rollback that is targetting
+ * a different inflight commit.
+ * @param rollbackRequestedOnly - if true, cretes .rollback.requested only,
otherwise creates .rollback.inflight
+ * @throws Exception
+ */
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testConcurrentRollbackAndCommitNoConflict(boolean
rollbackRequestedOnly) throws Exception {
+ // Create two different commits
+ String targetCommitTime = WriteClientTestUtils.createNewInstantTime();
+ createCommit(targetCommitTime, metaClient);
+ String differentCommitTime = WriteClientTestUtils.createNewInstantTime();
+ createCommit(differentCommitTime, metaClient);
+
+ HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
+ Option<HoodieInstant> lastSuccessfulInstant =
timeline.getCommitsTimeline().filterCompletedInstants().lastInstant();
+
+ // Start a new commit (inflight ingestion commit)
+ String inflightCommitTime = WriteClientTestUtils.createNewInstantTime();
+ createInflightCommit(inflightCommitTime, metaClient);
+
+ // Start a rollback operation targeting a different commit (not the
inflight one)
+ String rollbackInstantTime = WriteClientTestUtils.createNewInstantTime();
+ if (rollbackRequestedOnly) {
+ createRollbackRequested(rollbackInstantTime, targetCommitTime,
metaClient);
+ } else {
+ createRollbackInflight(rollbackInstantTime, targetCommitTime,
metaClient);
+ }
+
+ // Set up the conflict resolution strategy
+ Option<HoodieInstant> currentInstant =
Option.of(INSTANT_GENERATOR.createNewInstant(State.INFLIGHT,
HoodieTimeline.COMMIT_ACTION, inflightCommitTime));
+ SimpleConcurrentFileWritesConflictResolutionStrategy strategy = new
PreferWriterConflictResolutionStrategy();
+ HoodieCommitMetadata currentMetadata =
createCommitMetadata(inflightCommitTime);
+
+ metaClient.reloadActiveTimeline();
+ List<HoodieInstant> candidateInstants =
strategy.getCandidateInstants(metaClient, currentInstant.get(),
lastSuccessfulInstant).collect(
+ Collectors.toList());
+
+ // The rollback operation should be detected as a candidate instant
+ Assertions.assertTrue(candidateInstants.size() == 1);
+ ConcurrentOperation rollbackOperation = new
ConcurrentOperation(candidateInstants.get(0), metaClient);
+ ConcurrentOperation commitOperation = new
ConcurrentOperation(currentInstant.get(), currentMetadata);
+
+ // The strategy should NOT detect a conflict since the rollback targets a
different commit
+ Assertions.assertFalse(strategy.hasConflict(commitOperation,
rollbackOperation));
+ }
}