This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new ed9ea0ead908 fix(flink): Trigger a failover after pending instants 
recommitted for both global and partitioned RLI (#18793)
ed9ea0ead908 is described below

commit ed9ea0ead908e3765b6c938da121e96ace13ba38
Author: Shuo Cheng <[email protected]>
AuthorDate: Tue Jun 2 08:47:25 2026 +0800

    fix(flink): Trigger a failover after pending instants recommitted for both 
global and partitioned RLI (#18793)
---
 .../apache/hudi/configuration/OptionsResolver.java |  4 ----
 .../hudi/sink/StreamWriteOperatorCoordinator.java  | 10 +++++-----
 .../sink/TestStreamWriteOperatorCoordinator.java   | 23 ++++++++++++++++++++++
 .../hudi/sink/utils/BulkInsertFunctionWrapper.java |  4 +---
 .../hudi/sink/utils/InsertFunctionWrapper.java     | 20 +++++++++++++++----
 .../sink/utils/StreamWriteFunctionWrapper.java     |  3 ---
 6 files changed, 45 insertions(+), 19 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
index 66e6e2815a5a..74c5257cf44f 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
@@ -232,10 +232,6 @@ public class OptionsResolver {
     return indexType == HoodieIndex.IndexType.GLOBAL_RECORD_LEVEL_INDEX;
   }
 
-  public static boolean isRLIWithBootstrap(Configuration conf) {
-    return isGlobalRecordLevelIndex(conf) && 
conf.get(FlinkOptions.INDEX_BOOTSTRAP_ENABLED);
-  }
-
   /**
    * Returns whether it is a MERGE_ON_READ table, and updates by bucket index.
    */
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index 0a10d6d7b88d..7b9095258903 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -330,7 +330,7 @@ public class StreamWriteOperatorCoordinator
       // resetToCheckpoint() is called in two cases:
       // 1. The job is restarted from state, start() will be called later.
       // 2. The job is recovered from global failover. The coordinator is 
already started, and start() will not be called again.
-      if (executor != null && tableState.isRLIWithBootstrap) {
+      if (executor != null && tableState.isRecordLevelIndex) {
         // use sync execution here to make sure the recommitting finishes 
before RLI bootstrapping
         this.executor.executeSync(this::restoreEvents, "Recommit pending 
instants on resetting to checkpoint: %s.", checkpointID);
       }
@@ -373,7 +373,7 @@ public class StreamWriteOperatorCoordinator
   @Override
   public void subtaskReset(int i, long resetCkpId) {
     // There exists pending instants waiting for recommiting.
-    if (tableState.isRLIWithBootstrap && 
!eventBuffers.getPendingInstantsBefore(resetCkpId).isEmpty()) {
+    if (tableState.isRecordLevelIndex && 
!eventBuffers.getPendingInstantsBefore(resetCkpId).isEmpty()) {
       // use sync execution here to make sure the recommitting finishes before 
RLI bootstrapping
       executor.executeSync(() -> commitInstants(resetCkpId), "Recommit pending 
instants on resetting subtask %s to checkpoint: %s.", i, resetCkpId);
     }
@@ -565,7 +565,7 @@ public class StreamWriteOperatorCoordinator
     if (eventBuffer.allBootstrapEventsReceived()) {
       // start to recommit the instant.
       boolean committed = recommitInstant(event.getCheckpointId(), 
event.getInstantTime(), eventBuffer);
-      if (committed && tableState.isRLIWithBootstrap) {
+      if (committed && tableState.isRecordLevelIndex) {
         context.failJob(new HoodieException("There are pending instants 
recommitted on job restart,"
             + "triggering a global failover so that RLI bootstrap operator can 
load the record level index completely."));
       }
@@ -759,7 +759,7 @@ public class StreamWriteOperatorCoordinator
     final boolean syncMetadata;
     final boolean isDeltaTimeCompaction;
     final boolean isStreamingIndexWriteEnabled;
-    final boolean isRLIWithBootstrap;
+    final boolean isRecordLevelIndex;
 
     private TableState(Configuration conf) {
       this.operationType = 
WriteOperationType.fromValue(conf.get(FlinkOptions.OPERATION));
@@ -773,7 +773,7 @@ public class StreamWriteOperatorCoordinator
       this.syncMetadata = conf.get(FlinkOptions.METADATA_ENABLED);
       this.isDeltaTimeCompaction = OptionsResolver.isDeltaTimeCompaction(conf);
       this.isStreamingIndexWriteEnabled = 
OptionsResolver.isStreamingIndexWriteEnabled(conf);
-      this.isRLIWithBootstrap = OptionsResolver.isRLIWithBootstrap(conf);
+      this.isRecordLevelIndex = OptionsResolver.isGlobalRecordLevelIndex(conf) 
|| OptionsResolver.isRecordLevelIndex(conf);
     }
 
     public static TableState create(Configuration conf) {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
index 981cacfc54bb..f8205f57f342 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
@@ -72,6 +72,7 @@ import org.slf4j.Logger;
 
 import java.io.File;
 import java.io.IOException;
+import java.lang.reflect.Field;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -307,6 +308,18 @@ public class TestStreamWriteOperatorCoordinator {
     assertThat("Recommits the instant with partial uncommitted events", 
lastCompleted, is(instant));
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = {"GLOBAL_RECORD_LEVEL_INDEX", "RECORD_LEVEL_INDEX"})
+  public void testRecordLevelIndexFlag(String indexType) throws Exception {
+    Configuration conf = TestConfigurations.getDefaultConf(new File(tempFile, 
indexType).getAbsolutePath());
+    conf.set(FlinkOptions.INDEX_TYPE, indexType);
+    conf.set(FlinkOptions.INDEX_WRITE_TASKS, 1);
+
+    try (StreamWriteOperatorCoordinator coordinator = createCoordinator(conf, 
1)) {
+      assertTrue(getRecordLevelIndexFlag(coordinator));
+    }
+  }
+
   @Test
   public void testStopHeartbeatForUncommittedEventWithLazyCleanPolicy() throws 
Exception {
     // reset
@@ -776,6 +789,16 @@ public class TestStreamWriteOperatorCoordinator {
     assertTrue(context.isJobFailed(), message);
   }
 
+  private static boolean 
getRecordLevelIndexFlag(StreamWriteOperatorCoordinator coordinator) throws 
Exception {
+    Field tableStateField = 
StreamWriteOperatorCoordinator.class.getDeclaredField("tableState");
+    tableStateField.setAccessible(true);
+    Object tableState = tableStateField.get(coordinator);
+
+    Field recordLevelIndexField = 
tableState.getClass().getDeclaredField("isRecordLevelIndex");
+    recordLevelIndexField.setAccessible(true);
+    return recordLevelIndexField.getBoolean(tableState);
+  }
+
   // -------------------------------------------------------------------------
   //  Kafka Offset Tests
   // -------------------------------------------------------------------------
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BulkInsertFunctionWrapper.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BulkInsertFunctionWrapper.java
index 26f79d870d66..159ead016fb5 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BulkInsertFunctionWrapper.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BulkInsertFunctionWrapper.java
@@ -167,9 +167,7 @@ public class BulkInsertFunctionWrapper<I> implements 
TestFunctionWrapper<I> {
   }
 
   public void coordinatorFails() throws Exception {
-    this.coordinator.close();
-    this.coordinator.start();
-    this.coordinator.setExecutor(new 
MockCoordinatorExecutor(coordinatorContext));
+    // Do nothing since there is no state recovery for bulk insert.
   }
 
   public void restartCoordinator() throws Exception {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java
index 8c0369a889c7..9a8262155973 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java
@@ -49,6 +49,8 @@ import org.apache.flink.streaming.util.MockStreamTaskBuilder;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.logical.RowType;
 
+import java.util.Map;
+import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -70,6 +72,7 @@ public class InsertFunctionWrapper<I> implements 
TestFunctionWrapper<I> {
 
   private final boolean asyncClustering;
   private ClusteringFunctionWrapper clusteringFunctionWrapper;
+  private final TreeMap<Long, byte[]> coordinatorStateStore;
 
   /**
    * Append write function.
@@ -97,6 +100,7 @@ public class InsertFunctionWrapper<I> implements 
TestFunctionWrapper<I> {
     this.coordinatorContext = new MockOperatorCoordinatorContext(new 
OperatorID(), 1);
     this.coordinator = new StreamWriteOperatorCoordinator(conf, 
this.coordinatorContext);
     this.stateInitializationContext = new MockStateInitializationContext();
+    this.coordinatorStateStore = new TreeMap<>();
 
     this.asyncClustering = OptionsResolver.needsAsyncClustering(conf);
     StreamConfig streamConfig = new StreamConfig(conf);
@@ -142,8 +146,10 @@ public class InsertFunctionWrapper<I> implements 
TestFunctionWrapper<I> {
   }
 
   public void checkpointFunction(long checkpointId) throws Exception {
+    CompletableFuture<byte[]> completableFuture = new CompletableFuture<>();
     // checkpoint the coordinator first
-    this.coordinator.checkpointCoordinator(checkpointId, new 
CompletableFuture<>());
+    this.coordinator.checkpointCoordinator(checkpointId, completableFuture);
+    this.coordinatorStateStore.put(checkpointId, completableFuture.get());
 
     writeFunction.snapshotState(new MockFunctionSnapshotContext(checkpointId));
     stateInitializationContext.checkpointBegin(checkpointId);
@@ -167,9 +173,15 @@ public class InsertFunctionWrapper<I> implements 
TestFunctionWrapper<I> {
   }
 
   public void coordinatorFails() throws Exception {
-    this.coordinator.close();
-    this.coordinator.start();
-    this.coordinator.setExecutor(new 
MockCoordinatorExecutor(coordinatorContext));
+    resetCoordinatorToCheckpoint();
+  }
+
+  private void resetCoordinatorToCheckpoint() {
+    if (coordinatorStateStore.isEmpty()) {
+      return;
+    }
+    Map.Entry<Long, byte[]> latestState = 
this.coordinatorStateStore.lastEntry();
+    this.coordinator.resetToCheckpoint(latestState.getKey(), 
latestState.getValue());
   }
 
   public void restartCoordinator() throws Exception {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
index 52b8e800cfd3..baab42555c1e 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
@@ -368,10 +368,7 @@ public class StreamWriteFunctionWrapper<I> implements 
TestFunctionWrapper<I> {
   }
 
   public void coordinatorFails() throws Exception {
-    this.coordinator.close();
     resetCoordinatorToCheckpoint();
-    this.coordinator.start();
-    this.coordinator.setExecutor(new 
MockCoordinatorExecutor(coordinatorContext));
   }
 
   public void restartCoordinator() throws Exception {

Reply via email to