This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new e6723a8b2af5 fix(metadata): Allow metadata table bootstrap when
pending commits are being rolled back (#18033)
e6723a8b2af5 is described below
commit e6723a8b2af51d71d9a71cb0321e4bab30585d89
Author: Prashant Wason <[email protected]>
AuthorDate: Wed Mar 11 10:05:19 2026 -0700
fix(metadata): Allow metadata table bootstrap when pending commits are
being rolled back (#18033)
When a pending commit exists on the data table timeline and the metadata
table folder has been deleted,
ingestion continuously fails because the metadata table cannot be
bootstrapped. This fix allows the
metadata table to be bootstrapped if the pending commits are being rolled
back.
Summary and Changelog
Summary: Allow metadata table initialization to proceed when pending data
instants are being rolled back.
Changelog:
Modified
HoodieBackedTableMetadataWriterTableVersionSix.shouldInitializeFromFilesystem()
to check for pending rollback instants
If all blocking pending data instants have corresponding pending rollback
instants, allow the metadata table bootstrap to proceed
Added informational logging when initialization is allowed due to pending
rollbacks
Co-Authored-By: Claude Opus 4.6 <[email protected]>
---------
Co-authored-by: Claude Opus 4.5 <[email protected]>
---
...ieBackedTableMetadataWriterTableVersionSix.java | 36 ++-
...ieBackedTableMetadataWriterTableVersionSix.java | 260 +++++++++++++++++++++
2 files changed, 288 insertions(+), 8 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriterTableVersionSix.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriterTableVersionSix.java
index 2abeaec35a6d..a7f51400300e 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriterTableVersionSix.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriterTableVersionSix.java
@@ -87,15 +87,35 @@ public abstract class
HoodieBackedTableMetadataWriterTableVersionSix<I, O> exten
@Override
boolean shouldInitializeFromFilesystem(Set<String> pendingDataInstants,
Option<String> inflightInstantTimestamp) {
- if (pendingDataInstants.stream()
- .anyMatch(i -> !inflightInstantTimestamp.isPresent() ||
!i.equals(inflightInstantTimestamp.get()))) {
- metrics.ifPresent(m ->
m.updateMetrics(HoodieMetadataMetrics.BOOTSTRAP_ERR_STR, 1));
- LOG.warn("Cannot initialize metadata table as operation(s) are in
progress on the dataset: {}",
- Arrays.toString(pendingDataInstants.toArray()));
- return false;
- } else {
- return true;
+ // Check if there are pending data instants that are not the current
inflight instant
+ Set<String> blockingPendingInstants = pendingDataInstants.stream()
+ .filter(i -> !inflightInstantTimestamp.isPresent() ||
!i.equals(inflightInstantTimestamp.get()))
+ .collect(Collectors.toSet());
+
+ if (!blockingPendingInstants.isEmpty()) {
+ // If a pending commit is being rolled back, allow the bootstrap to
proceed.
+ // Check for pending rollback instants that match the blocking pending
instants.
+ Set<String> pendingRollbackInstants = dataMetaClient.getActiveTimeline()
+ .getRollbackTimeline()
+ .getInstantsAsStream()
+ .filter(i -> !i.isCompleted())
+ .map(HoodieInstant::requestedTime)
+ .collect(Collectors.toSet());
+
+ // Check if all blocking pending instants have a corresponding pending
rollback
+ boolean allBlockingInstantsBeingRolledBack =
!pendingRollbackInstants.isEmpty()
+ &&
blockingPendingInstants.stream().allMatch(pendingRollbackInstants::contains);
+
+ if (!allBlockingInstantsBeingRolledBack) {
+ metrics.ifPresent(m ->
m.updateMetrics(HoodieMetadataMetrics.BOOTSTRAP_ERR_STR, 1));
+ LOG.warn("Cannot initialize metadata table as operation(s) are in
progress on the dataset: {}",
+ Arrays.toString(blockingPendingInstants.toArray()));
+ return false;
+ }
+ LOG.info("Allowing metadata table initialization as pending instants {}
are being rolled back",
+ Arrays.toString(blockingPendingInstants.toArray()));
}
+ return true;
}
@Override
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieBackedTableMetadataWriterTableVersionSix.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieBackedTableMetadataWriterTableVersionSix.java
new file mode 100644
index 000000000000..804acc5f2ef3
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieBackedTableMetadataWriterTableVersionSix.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.InstantGenerator;
+import org.apache.hudi.common.table.timeline.versioning.v1.ActiveTimelineV1;
+import org.apache.hudi.common.table.timeline.versioning.v1.InstantGeneratorV1;
+import org.apache.hudi.common.util.Option;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.CALLS_REAL_METHODS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link HoodieBackedTableMetadataWriterTableVersionSix}.
+ */
+class TestHoodieBackedTableMetadataWriterTableVersionSix {
+
+ // Use V1 instant generator for table version 6 (V2 is for table version 8)
+ private static final InstantGenerator INSTANT_GENERATOR = new
InstantGeneratorV1();
+
+ /**
+ * Test shouldInitializeFromFilesystem returns true when there are no
pending data instants.
+ */
+ @Test
+ void testShouldInitializeFromFilesystem_noPendingInstants() throws Exception
{
+ HoodieTableMetaClient mockDataMetaClient =
mock(HoodieTableMetaClient.class);
+ HoodieActiveTimeline mockTimeline =
createMockTimeline(Collections.emptyList());
+ when(mockDataMetaClient.getActiveTimeline()).thenReturn(mockTimeline);
+
+ HoodieBackedTableMetadataWriterTableVersionSix<?, ?> writer =
createMockWriter(mockDataMetaClient);
+
+ Set<String> pendingDataInstants = Collections.emptySet();
+ Option<String> inflightInstantTimestamp = Option.empty();
+
+ boolean result = invokeShouldInitializeFromFilesystem(writer,
pendingDataInstants, inflightInstantTimestamp);
+ assertTrue(result, "Should allow initialization when there are no pending
data instants");
+ }
+
+ /**
+ * Test shouldInitializeFromFilesystem returns true when the only pending
instant is the current inflight instant.
+ */
+ @Test
+ void testShouldInitializeFromFilesystem_onlyCurrentInflightInstant() throws
Exception {
+ HoodieTableMetaClient mockDataMetaClient =
mock(HoodieTableMetaClient.class);
+ HoodieActiveTimeline mockTimeline =
createMockTimeline(Collections.emptyList());
+ when(mockDataMetaClient.getActiveTimeline()).thenReturn(mockTimeline);
+
+ HoodieBackedTableMetadataWriterTableVersionSix<?, ?> writer =
createMockWriter(mockDataMetaClient);
+
+ String currentInflightTime = "20250101120000000";
+ Set<String> pendingDataInstants = new HashSet<>();
+ pendingDataInstants.add(currentInflightTime);
+ Option<String> inflightInstantTimestamp = Option.of(currentInflightTime);
+
+ boolean result = invokeShouldInitializeFromFilesystem(writer,
pendingDataInstants, inflightInstantTimestamp);
+ assertTrue(result, "Should allow initialization when only pending instant
is the current inflight instant");
+ }
+
+ /**
+ * Test shouldInitializeFromFilesystem returns false when there are blocking
pending instants with no rollbacks.
+ */
+ @Test
+ void testShouldInitializeFromFilesystem_blockingInstantsWithNoRollbacks()
throws Exception {
+ HoodieTableMetaClient mockDataMetaClient =
mock(HoodieTableMetaClient.class);
+ // Empty rollback timeline
+ HoodieActiveTimeline mockTimeline =
createMockTimeline(Collections.emptyList());
+ when(mockDataMetaClient.getActiveTimeline()).thenReturn(mockTimeline);
+
+ HoodieBackedTableMetadataWriterTableVersionSix<?, ?> writer =
createMockWriter(mockDataMetaClient);
+
+ Set<String> pendingDataInstants = new HashSet<>();
+ pendingDataInstants.add("20250101110000000");
+ pendingDataInstants.add("20250101120000000");
+ Option<String> inflightInstantTimestamp = Option.empty();
+
+ boolean result = invokeShouldInitializeFromFilesystem(writer,
pendingDataInstants, inflightInstantTimestamp);
+ assertFalse(result, "Should block initialization when there are pending
instants without rollbacks");
+ }
+
+ /**
+ * Test shouldInitializeFromFilesystem returns true when all blocking
pending instants have corresponding pending rollbacks.
+ */
+ @Test
+ void testShouldInitializeFromFilesystem_allBlockingInstantsBeingRolledBack()
throws Exception {
+ String pendingInstant1 = "20250101110000000";
+ String pendingInstant2 = "20250101120000000";
+
+ // Create pending rollback instants for all blocking instants
+ List<HoodieInstant> rollbackInstants = new ArrayList<>();
+ rollbackInstants.add(INSTANT_GENERATOR.createNewInstant(
+ HoodieInstant.State.REQUESTED, HoodieTimeline.ROLLBACK_ACTION,
pendingInstant1));
+ rollbackInstants.add(INSTANT_GENERATOR.createNewInstant(
+ HoodieInstant.State.INFLIGHT, HoodieTimeline.ROLLBACK_ACTION,
pendingInstant2));
+
+ HoodieTableMetaClient mockDataMetaClient =
mock(HoodieTableMetaClient.class);
+ HoodieActiveTimeline mockTimeline = createMockTimeline(rollbackInstants);
+ when(mockDataMetaClient.getActiveTimeline()).thenReturn(mockTimeline);
+
+ HoodieBackedTableMetadataWriterTableVersionSix<?, ?> writer =
createMockWriter(mockDataMetaClient);
+
+ Set<String> pendingDataInstants = new HashSet<>();
+ pendingDataInstants.add(pendingInstant1);
+ pendingDataInstants.add(pendingInstant2);
+ Option<String> inflightInstantTimestamp = Option.empty();
+
+ boolean result = invokeShouldInitializeFromFilesystem(writer,
pendingDataInstants, inflightInstantTimestamp);
+ assertTrue(result, "Should allow initialization when all blocking instants
are being rolled back");
+ }
+
+ /**
+ * Test shouldInitializeFromFilesystem returns false when only some blocking
pending instants have rollbacks.
+ */
+ @Test
+ void testShouldInitializeFromFilesystem_partialRollbacks() throws Exception {
+ String pendingInstant1 = "20250101110000000";
+ String pendingInstant2 = "20250101120000000";
+
+ // Create pending rollback instant for only one of the blocking instants
+ List<HoodieInstant> rollbackInstants = new ArrayList<>();
+ rollbackInstants.add(INSTANT_GENERATOR.createNewInstant(
+ HoodieInstant.State.REQUESTED, HoodieTimeline.ROLLBACK_ACTION,
pendingInstant1));
+
+ HoodieTableMetaClient mockDataMetaClient =
mock(HoodieTableMetaClient.class);
+ HoodieActiveTimeline mockTimeline = createMockTimeline(rollbackInstants);
+ when(mockDataMetaClient.getActiveTimeline()).thenReturn(mockTimeline);
+
+ HoodieBackedTableMetadataWriterTableVersionSix<?, ?> writer =
createMockWriter(mockDataMetaClient);
+
+ Set<String> pendingDataInstants = new HashSet<>();
+ pendingDataInstants.add(pendingInstant1);
+ pendingDataInstants.add(pendingInstant2);
+ Option<String> inflightInstantTimestamp = Option.empty();
+
+ boolean result = invokeShouldInitializeFromFilesystem(writer,
pendingDataInstants, inflightInstantTimestamp);
+ assertFalse(result, "Should block initialization when only some blocking
instants have rollbacks");
+ }
+
+ /**
+ * Test shouldInitializeFromFilesystem with a mix of current inflight
instant and other pending instants being rolled back.
+ */
+ @Test
+ void testShouldInitializeFromFilesystem_mixedInflightAndRollbacks() throws
Exception {
+ String currentInflightTime = "20250101130000000";
+ String pendingInstant1 = "20250101110000000";
+ String pendingInstant2 = "20250101120000000";
+
+ // Create pending rollback instants for the blocking instants (not
including current inflight)
+ List<HoodieInstant> rollbackInstants = new ArrayList<>();
+ rollbackInstants.add(INSTANT_GENERATOR.createNewInstant(
+ HoodieInstant.State.REQUESTED, HoodieTimeline.ROLLBACK_ACTION,
pendingInstant1));
+ rollbackInstants.add(INSTANT_GENERATOR.createNewInstant(
+ HoodieInstant.State.INFLIGHT, HoodieTimeline.ROLLBACK_ACTION,
pendingInstant2));
+
+ HoodieTableMetaClient mockDataMetaClient =
mock(HoodieTableMetaClient.class);
+ HoodieActiveTimeline mockTimeline = createMockTimeline(rollbackInstants);
+ when(mockDataMetaClient.getActiveTimeline()).thenReturn(mockTimeline);
+
+ HoodieBackedTableMetadataWriterTableVersionSix<?, ?> writer =
createMockWriter(mockDataMetaClient);
+
+ Set<String> pendingDataInstants = new HashSet<>();
+ pendingDataInstants.add(currentInflightTime);
+ pendingDataInstants.add(pendingInstant1);
+ pendingDataInstants.add(pendingInstant2);
+ Option<String> inflightInstantTimestamp = Option.of(currentInflightTime);
+
+ boolean result = invokeShouldInitializeFromFilesystem(writer,
pendingDataInstants, inflightInstantTimestamp);
+ assertTrue(result, "Should allow initialization when current inflight is
excluded and other blocking instants are being rolled back");
+ }
+
+ /**
+ * Test shouldInitializeFromFilesystem returns false when completed rollback
instants exist
+ * but no pending rollbacks for blocking instants.
+ */
+ @Test
+ void testShouldInitializeFromFilesystem_completedRollbacksDoNotCount()
throws Exception {
+ String pendingInstant1 = "20250101110000000";
+
+ // Create only completed rollback instant (should not count as pending)
+ List<HoodieInstant> rollbackInstants = new ArrayList<>();
+ rollbackInstants.add(INSTANT_GENERATOR.createNewInstant(
+ HoodieInstant.State.COMPLETED, HoodieTimeline.ROLLBACK_ACTION,
pendingInstant1, "20250101110100000"));
+
+ HoodieTableMetaClient mockDataMetaClient =
mock(HoodieTableMetaClient.class);
+ HoodieActiveTimeline mockTimeline = createMockTimeline(rollbackInstants);
+ when(mockDataMetaClient.getActiveTimeline()).thenReturn(mockTimeline);
+
+ HoodieBackedTableMetadataWriterTableVersionSix<?, ?> writer =
createMockWriter(mockDataMetaClient);
+
+ Set<String> pendingDataInstants = new HashSet<>();
+ pendingDataInstants.add(pendingInstant1);
+ Option<String> inflightInstantTimestamp = Option.empty();
+
+ boolean result = invokeShouldInitializeFromFilesystem(writer,
pendingDataInstants, inflightInstantTimestamp);
+ assertFalse(result, "Should block initialization when rollbacks are
completed, not pending");
+ }
+
+ private HoodieBackedTableMetadataWriterTableVersionSix<?, ?>
createMockWriter(HoodieTableMetaClient dataMetaClient) throws Exception {
+ // Use CALLS_REAL_METHODS so that shouldInitializeFromFilesystem executes
the real logic
+ HoodieBackedTableMetadataWriterTableVersionSix<?, ?> writer =
mock(HoodieBackedTableMetadataWriterTableVersionSix.class, CALLS_REAL_METHODS);
+
+ // Set the dataMetaClient field
+ java.lang.reflect.Field dataMetaClientField =
HoodieBackedTableMetadataWriter.class.getDeclaredField("dataMetaClient");
+ dataMetaClientField.setAccessible(true);
+ dataMetaClientField.set(writer, dataMetaClient);
+
+ // Set the metrics field to avoid NPE
+ java.lang.reflect.Field metricsField =
HoodieBackedTableMetadataWriter.class.getDeclaredField("metrics");
+ metricsField.setAccessible(true);
+ metricsField.set(writer, Option.empty());
+
+ return writer;
+ }
+
+ private boolean invokeShouldInitializeFromFilesystem(
+ HoodieBackedTableMetadataWriterTableVersionSix<?, ?> writer,
+ Set<String> pendingDataInstants,
+ Option<String> inflightInstantTimestamp) {
+ // The test class is in the same package, so we can call the
package-private method directly
+ return writer.shouldInitializeFromFilesystem(pendingDataInstants,
inflightInstantTimestamp);
+ }
+
+ @SuppressWarnings("deprecation")
+ private HoodieActiveTimeline createMockTimeline(List<HoodieInstant>
instants) {
+ // Use V1 timeline for table version 6 (V2 is for table version 8)
+ ActiveTimelineV1 timeline = new ActiveTimelineV1();
+ timeline.setInstants(instants);
+ return timeline;
+ }
+}