This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch release-1.2.0
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/release-1.2.0 by this push:
new 7f0d64bfe771 fix(flink): Trigger a failover after pending instants
recommitted for both global and partitioned RLI (#18789)
7f0d64bfe771 is described below
commit 7f0d64bfe7718ab5218a9baca3230d041346b3b9
Author: Shuo Cheng <[email protected]>
AuthorDate: Wed May 20 21:11:04 2026 +0800
fix(flink): Trigger a failover after pending instants recommitted for both
global and partitioned RLI (#18789)
---
.../apache/hudi/configuration/OptionsResolver.java | 4 ----
.../hudi/sink/StreamWriteOperatorCoordinator.java | 10 +++++-----
.../sink/TestStreamWriteOperatorCoordinator.java | 23 ++++++++++++++++++++++
.../sink/utils/StreamWriteFunctionWrapper.java | 3 +++
4 files changed, 31 insertions(+), 9 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
index 774a52231e8d..628474bde7d6 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
@@ -232,10 +232,6 @@ public class OptionsResolver {
return indexType == HoodieIndex.IndexType.GLOBAL_RECORD_LEVEL_INDEX;
}
- public static boolean isRLIWithBootstrap(Configuration conf) {
- return isGlobalRecordLevelIndex(conf) &&
conf.get(FlinkOptions.INDEX_BOOTSTRAP_ENABLED);
- }
-
/**
* Returns whether it is a MERGE_ON_READ table, and updates by bucket index.
*/
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index 82fcb3ed5c57..7e9d8cfa4cf5 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -330,7 +330,7 @@ public class StreamWriteOperatorCoordinator
// resetToCheckpoint() is called in two cases:
// 1. The job is restarted from state, start() will be called later.
// 2. The job is recovered from global failover. The coordinator is
already started, and start() will not be called again.
- if (executor != null && tableState.isRLIWithBootstrap) {
+ if (executor != null && tableState.isRecordLevelIndex) {
// use sync execution here to make sure the recommitting finishes
before RLI bootstrapping
this.executor.executeSync(this::restoreEvents, "Recommit pending
instants on resetting to checkpoint: %s.", checkpointID);
}
@@ -373,7 +373,7 @@ public class StreamWriteOperatorCoordinator
@Override
public void subtaskReset(int i, long resetCkpId) {
// There exists pending instants waiting for recommiting.
- if (tableState.isRLIWithBootstrap &&
!eventBuffers.getPendingInstantsBefore(resetCkpId).isEmpty()) {
+ if (tableState.isRecordLevelIndex &&
!eventBuffers.getPendingInstantsBefore(resetCkpId).isEmpty()) {
// use sync execution here to make sure the recommitting finishes before
RLI bootstrapping
executor.executeSync(() -> commitInstants(resetCkpId), "Recommit pending
instants on resetting subtask %s to checkpoint: %s.", i, resetCkpId);
}
@@ -565,7 +565,7 @@ public class StreamWriteOperatorCoordinator
if (eventBuffer.allBootstrapEventsReceived()) {
// start to recommit the instant.
boolean committed = recommitInstant(event.getCheckpointId(),
event.getInstantTime(), eventBuffer);
- if (committed && tableState.isRLIWithBootstrap) {
+ if (committed && tableState.isRecordLevelIndex) {
context.failJob(new HoodieException("There are pending instants
recommitted on job restart,"
+ "triggering a global failover so that RLI bootstrap operator can
load the record level index completely."));
}
@@ -759,7 +759,7 @@ public class StreamWriteOperatorCoordinator
final boolean syncMetadata;
final boolean isDeltaTimeCompaction;
final boolean isStreamingIndexWriteEnabled;
- final boolean isRLIWithBootstrap;
+ final boolean isRecordLevelIndex;
private TableState(Configuration conf) {
this.operationType =
WriteOperationType.fromValue(conf.get(FlinkOptions.OPERATION));
@@ -773,7 +773,7 @@ public class StreamWriteOperatorCoordinator
this.syncMetadata = conf.get(FlinkOptions.METADATA_ENABLED);
this.isDeltaTimeCompaction = OptionsResolver.isDeltaTimeCompaction(conf);
this.isStreamingIndexWriteEnabled =
OptionsResolver.isStreamingIndexWriteEnabled(conf);
- this.isRLIWithBootstrap = OptionsResolver.isRLIWithBootstrap(conf);
+ this.isRecordLevelIndex = OptionsResolver.isGlobalRecordLevelIndex(conf)
|| OptionsResolver.isRecordLevelIndex(conf);
}
public static TableState create(Configuration conf) {
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
index a5ccb2ffb20f..d5903ef946dc 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
@@ -72,6 +72,7 @@ import org.slf4j.Logger;
import java.io.File;
import java.io.IOException;
+import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -307,6 +308,18 @@ public class TestStreamWriteOperatorCoordinator {
assertThat("Recommits the instant with partial uncommitted events",
lastCompleted, is(instant));
}
+ @ParameterizedTest
+ @ValueSource(strings = {"GLOBAL_RECORD_LEVEL_INDEX", "RECORD_LEVEL_INDEX"})
+ public void testRecordLevelIndexFlag(String indexType) throws Exception {
+ Configuration conf = TestConfigurations.getDefaultConf(new File(tempFile,
indexType).getAbsolutePath());
+ conf.set(FlinkOptions.INDEX_TYPE, indexType);
+ conf.set(FlinkOptions.INDEX_WRITE_TASKS, 1);
+
+ try (StreamWriteOperatorCoordinator coordinator = createCoordinator(conf,
1)) {
+ assertTrue(getRecordLevelIndexFlag(coordinator));
+ }
+ }
+
@Test
public void testStopHeartbeatForUncommittedEventWithLazyCleanPolicy() throws
Exception {
// reset
@@ -776,6 +789,16 @@ public class TestStreamWriteOperatorCoordinator {
assertTrue(context.isJobFailed(), message);
}
+ private static boolean
getRecordLevelIndexFlag(StreamWriteOperatorCoordinator coordinator) throws
Exception {
+ Field tableStateField =
StreamWriteOperatorCoordinator.class.getDeclaredField("tableState");
+ tableStateField.setAccessible(true);
+ Object tableState = tableStateField.get(coordinator);
+
+ Field recordLevelIndexField =
tableState.getClass().getDeclaredField("isRecordLevelIndex");
+ recordLevelIndexField.setAccessible(true);
+ return recordLevelIndexField.getBoolean(tableState);
+ }
+
// -------------------------------------------------------------------------
// Kafka Offset Tests
// -------------------------------------------------------------------------
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
index dec0936f05a6..7b1684c8e629 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
@@ -369,6 +369,9 @@ public class StreamWriteFunctionWrapper<I> implements
TestFunctionWrapper<I> {
public void coordinatorFails() throws Exception {
this.coordinator.close();
+ if (isStreamingWriteIndexEnabled) {
+ this.coordinator.setExecutor(new
MockCoordinatorExecutor(coordinatorContext));
+ }
resetCoordinatorToCheckpoint();
this.coordinator.start();
this.coordinator.setExecutor(new
MockCoordinatorExecutor(coordinatorContext));