This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch release-1.2.0
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/release-1.2.0 by this push:
     new 7f0d64bfe771 fix(flink): Trigger a failover after pending instants 
recommitted for both global and partitioned RLI (#18789)
7f0d64bfe771 is described below

commit 7f0d64bfe7718ab5218a9baca3230d041346b3b9
Author: Shuo Cheng <[email protected]>
AuthorDate: Wed May 20 21:11:04 2026 +0800

    fix(flink): Trigger a failover after pending instants recommitted for both 
global and partitioned RLI (#18789)
---
 .../apache/hudi/configuration/OptionsResolver.java |  4 ----
 .../hudi/sink/StreamWriteOperatorCoordinator.java  | 10 +++++-----
 .../sink/TestStreamWriteOperatorCoordinator.java   | 23 ++++++++++++++++++++++
 .../sink/utils/StreamWriteFunctionWrapper.java     |  3 +++
 4 files changed, 31 insertions(+), 9 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
index 774a52231e8d..628474bde7d6 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
@@ -232,10 +232,6 @@ public class OptionsResolver {
     return indexType == HoodieIndex.IndexType.GLOBAL_RECORD_LEVEL_INDEX;
   }
 
-  public static boolean isRLIWithBootstrap(Configuration conf) {
-    return isGlobalRecordLevelIndex(conf) && 
conf.get(FlinkOptions.INDEX_BOOTSTRAP_ENABLED);
-  }
-
   /**
    * Returns whether it is a MERGE_ON_READ table, and updates by bucket index.
    */
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index 82fcb3ed5c57..7e9d8cfa4cf5 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -330,7 +330,7 @@ public class StreamWriteOperatorCoordinator
       // resetToCheckpoint() is called in two cases:
       // 1. The job is restarted from state, start() will be called later.
       // 2. The job is recovered from global failover. The coordinator is 
already started, and start() will not be called again.
-      if (executor != null && tableState.isRLIWithBootstrap) {
+      if (executor != null && tableState.isRecordLevelIndex) {
         // use sync execution here to make sure the recommitting finishes 
before RLI bootstrapping
         this.executor.executeSync(this::restoreEvents, "Recommit pending 
instants on resetting to checkpoint: %s.", checkpointID);
       }
@@ -373,7 +373,7 @@ public class StreamWriteOperatorCoordinator
   @Override
   public void subtaskReset(int i, long resetCkpId) {
     // There exists pending instants waiting for recommiting.
-    if (tableState.isRLIWithBootstrap && 
!eventBuffers.getPendingInstantsBefore(resetCkpId).isEmpty()) {
+    if (tableState.isRecordLevelIndex && 
!eventBuffers.getPendingInstantsBefore(resetCkpId).isEmpty()) {
       // use sync execution here to make sure the recommitting finishes before 
RLI bootstrapping
       executor.executeSync(() -> commitInstants(resetCkpId), "Recommit pending 
instants on resetting subtask %s to checkpoint: %s.", i, resetCkpId);
     }
@@ -565,7 +565,7 @@ public class StreamWriteOperatorCoordinator
     if (eventBuffer.allBootstrapEventsReceived()) {
       // start to recommit the instant.
       boolean committed = recommitInstant(event.getCheckpointId(), 
event.getInstantTime(), eventBuffer);
-      if (committed && tableState.isRLIWithBootstrap) {
+      if (committed && tableState.isRecordLevelIndex) {
         context.failJob(new HoodieException("There are pending instants 
recommitted on job restart,"
             + "triggering a global failover so that RLI bootstrap operator can 
load the record level index completely."));
       }
@@ -759,7 +759,7 @@ public class StreamWriteOperatorCoordinator
     final boolean syncMetadata;
     final boolean isDeltaTimeCompaction;
     final boolean isStreamingIndexWriteEnabled;
-    final boolean isRLIWithBootstrap;
+    final boolean isRecordLevelIndex;
 
     private TableState(Configuration conf) {
       this.operationType = 
WriteOperationType.fromValue(conf.get(FlinkOptions.OPERATION));
@@ -773,7 +773,7 @@ public class StreamWriteOperatorCoordinator
       this.syncMetadata = conf.get(FlinkOptions.METADATA_ENABLED);
       this.isDeltaTimeCompaction = OptionsResolver.isDeltaTimeCompaction(conf);
       this.isStreamingIndexWriteEnabled = 
OptionsResolver.isStreamingIndexWriteEnabled(conf);
-      this.isRLIWithBootstrap = OptionsResolver.isRLIWithBootstrap(conf);
+      this.isRecordLevelIndex = OptionsResolver.isGlobalRecordLevelIndex(conf) 
|| OptionsResolver.isRecordLevelIndex(conf);
     }
 
     public static TableState create(Configuration conf) {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
index a5ccb2ffb20f..d5903ef946dc 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
@@ -72,6 +72,7 @@ import org.slf4j.Logger;
 
 import java.io.File;
 import java.io.IOException;
+import java.lang.reflect.Field;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -307,6 +308,18 @@ public class TestStreamWriteOperatorCoordinator {
     assertThat("Recommits the instant with partial uncommitted events", 
lastCompleted, is(instant));
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = {"GLOBAL_RECORD_LEVEL_INDEX", "RECORD_LEVEL_INDEX"})
+  public void testRecordLevelIndexFlag(String indexType) throws Exception {
+    Configuration conf = TestConfigurations.getDefaultConf(new File(tempFile, 
indexType).getAbsolutePath());
+    conf.set(FlinkOptions.INDEX_TYPE, indexType);
+    conf.set(FlinkOptions.INDEX_WRITE_TASKS, 1);
+
+    try (StreamWriteOperatorCoordinator coordinator = createCoordinator(conf, 
1)) {
+      assertTrue(getRecordLevelIndexFlag(coordinator));
+    }
+  }
+
   @Test
   public void testStopHeartbeatForUncommittedEventWithLazyCleanPolicy() throws 
Exception {
     // reset
@@ -776,6 +789,16 @@ public class TestStreamWriteOperatorCoordinator {
     assertTrue(context.isJobFailed(), message);
   }
 
+  private static boolean 
getRecordLevelIndexFlag(StreamWriteOperatorCoordinator coordinator) throws 
Exception {
+    Field tableStateField = 
StreamWriteOperatorCoordinator.class.getDeclaredField("tableState");
+    tableStateField.setAccessible(true);
+    Object tableState = tableStateField.get(coordinator);
+
+    Field recordLevelIndexField = 
tableState.getClass().getDeclaredField("isRecordLevelIndex");
+    recordLevelIndexField.setAccessible(true);
+    return recordLevelIndexField.getBoolean(tableState);
+  }
+
   // -------------------------------------------------------------------------
   //  Kafka Offset Tests
   // -------------------------------------------------------------------------
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
index dec0936f05a6..7b1684c8e629 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
@@ -369,6 +369,9 @@ public class StreamWriteFunctionWrapper<I> implements 
TestFunctionWrapper<I> {
 
   public void coordinatorFails() throws Exception {
     this.coordinator.close();
+    if (isStreamingWriteIndexEnabled) {
+      this.coordinator.setExecutor(new 
MockCoordinatorExecutor(coordinatorContext));
+    }
     resetCoordinatorToCheckpoint();
     this.coordinator.start();
     this.coordinator.setExecutor(new 
MockCoordinatorExecutor(coordinatorContext));

Reply via email to