This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new d290b8124a48 fix: HoodieRollbackException During Clean Operation - MDT
(#14008)
d290b8124a48 is described below
commit d290b8124a48d2d28f0d0fb881546628d4c04867
Author: Vamshi Krishna Kyatham
<[email protected]>
AuthorDate: Mon Sep 29 22:29:29 2025 -0700
fix: HoodieRollbackException During Clean Operation - MDT (#14008)
* behavior change: still trigger rollback for pending instant on MDT from
DT rollback action;
* change the rollback validation to be based on completion time.
---------
Co-authored-by: danny0405 <[email protected]>
---
.../metadata/HoodieBackedTableMetadataWriter.java | 20 +++++----
.../TestHoodieBackedTableMetadataWriter.java | 48 ++++++++++++++++++++++
2 files changed, 61 insertions(+), 7 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 4cae3b7833f8..5eb170c97881 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -1636,16 +1636,20 @@ public abstract class
HoodieBackedTableMetadataWriter<I, O> implements HoodieTab
// The commit which is being rolled back on the dataset
final String commitToRollbackInstantTime =
rollbackMetadata.getCommitsRollback().get(0);
// The deltacommit that will be rolled back
- HoodieInstant deltaCommitInstant =
metadataMetaClient.createNewInstant(HoodieInstant.State.COMPLETED,
- HoodieTimeline.DELTA_COMMIT_ACTION, commitToRollbackInstantTime);
- if
(metadataMetaClient.getActiveTimeline().getDeltaCommitTimeline().containsInstant(deltaCommitInstant.requestedTime()))
{
- validateRollback(commitToRollbackInstantTime);
+ Option<HoodieInstant> deltaCommitInstantOpt =
metadataMetaClient.getActiveTimeline()
+ .getDeltaCommitTimeline()
+ .filter(s -> s.requestedTime().equals(commitToRollbackInstantTime))
+ .firstInstant();
+ if (deltaCommitInstantOpt.isPresent()) {
+ HoodieInstant deltaCommitInstant = deltaCommitInstantOpt.get();
+ if (deltaCommitInstant.isCompleted()) {
+ validateRollback(deltaCommitInstant);
+ }
LOG.info("Rolling back MDT deltacommit {}",
commitToRollbackInstantTime);
if (!getWriteClient().rollback(commitToRollbackInstantTime,
instantTime)) {
throw new HoodieMetadataException(String.format("Failed to rollback
deltacommit at %s", commitToRollbackInstantTime));
}
} else {
- // if the instant is pending on MDT or not even exists the timeline,
just ignore.
LOG.info("Ignoring rollback of instant {} at {}. The commit to
rollback is not found in MDT",
commitToRollbackInstantTime, instantTime);
}
@@ -1653,7 +1657,7 @@ public abstract class HoodieBackedTableMetadataWriter<I,
O> implements HoodieTab
}
}
- private void validateRollback(String commitToRollbackInstantTime) {
+ private void validateRollback(HoodieInstant commitToRollbackInstant) {
// Find the deltacommits since the last compaction
Option<Pair<HoodieTimeline, HoodieInstant>> deltaCommitsInfo =
CompactionUtils.getDeltaCommitsSinceLatestCompaction(metadataMetaClient.getActiveTimeline());
@@ -1667,7 +1671,9 @@ public abstract class HoodieBackedTableMetadataWriter<I,
O> implements HoodieTab
// Hence, this case implies a rollback of completed commit which should
actually be handled using restore.
if (compactionInstant.getAction().equals(COMMIT_ACTION)) {
final String compactionInstantTime = compactionInstant.requestedTime();
- if (commitToRollbackInstantTime.length() ==
compactionInstantTime.length() &&
LESSER_THAN_OR_EQUALS.test(commitToRollbackInstantTime, compactionInstantTime))
{
+ final String commitToRollbackInstantTime =
commitToRollbackInstant.requestedTime();
+ if (commitToRollbackInstantTime.length() ==
compactionInstantTime.length()
+ && compareTimestamps(commitToRollbackInstant.getCompletionTime(),
LESSER_THAN_OR_EQUALS, compactionInstantTime)) {
throw new HoodieMetadataException(
String.format("Commit being rolled back %s is earlier than the
latest compaction %s. There are %d deltacommits after this compaction: %s",
commitToRollbackInstantTime, compactionInstantTime,
deltacommitsSinceCompaction.countInstants(),
deltacommitsSinceCompaction.getInstants())
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieBackedTableMetadataWriter.java
index 2c86a6c7b312..7b91d197e6da 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieBackedTableMetadataWriter.java
@@ -22,6 +22,9 @@ import org.apache.hudi.client.BaseHoodieWriteClient;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.versioning.v2.ActiveTimelineV2;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -30,6 +33,11 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.mockito.MockedStatic;
+import java.util.ArrayList;
+import java.util.List;
+
+import static
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
@@ -108,4 +116,44 @@ class TestHoodieBackedTableMetadataWriter {
.build();
assertSame(mockMetaClient,
HoodieBackedTableMetadataWriter.rollbackFailedWrites(lazyWriteConfig,
mockWriteClient, mockMetaClient));
}
+
+ @Test
+ void testValidateRollbackForMDT() throws Exception {
+ List<HoodieInstant> instants = new ArrayList<>();
+
+
instants.add(INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.REQUESTED,
HoodieTimeline.DELTA_COMMIT_ACTION, "20250925012123905"));
+
+
instants.add(INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "20250925012447357", "20250925013432341"));
+
instants.add(INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "20250925012518125", "20250925012831379"));
+
instants.add(INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "20250925012851950", "20250925013157886"));
+
+
instants.add(INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.COMMIT_ACTION, "20250925012523368", "20250925015000000"));
+
+ // a deltacommit instant requested before the compaction request time and
finished after it.
+ HoodieInstant instantToRollback =
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "20250925012123804", "20250925014634434");
+ instants.add(instantToRollback);
+
+ HoodieActiveTimeline timeline = createMockTimeline(instants);
+
+ HoodieBackedTableMetadataWriter<?, ?> writer =
mock(HoodieBackedTableMetadataWriter.class);
+
+ HoodieTableMetaClient mockMetaClient = mock(HoodieTableMetaClient.class);
+ when(mockMetaClient.getActiveTimeline()).thenReturn(timeline);
+
+ java.lang.reflect.Field metadataMetaClientField =
HoodieBackedTableMetadataWriter.class.getDeclaredField("metadataMetaClient");
+ metadataMetaClientField.setAccessible(true);
+ metadataMetaClientField.set(writer, mockMetaClient);
+
+ java.lang.reflect.Method validateRollbackMethod =
HoodieBackedTableMetadataWriter.class.getDeclaredMethod("validateRollback",
HoodieInstant.class);
+ validateRollbackMethod.setAccessible(true);
+
+ assertDoesNotThrow(() -> validateRollbackMethod.invoke(writer,
instantToRollback));
+ }
+
+ @SuppressWarnings("deprecation")
+ private HoodieActiveTimeline createMockTimeline(List<HoodieInstant>
instants) {
+ ActiveTimelineV2 timeline = new ActiveTimelineV2();
+ timeline.setInstants(instants);
+ return timeline;
+ }
}