This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new d69826cc5dc2 fix(flink): Trigger a failover after pending instants 
recommitted to b… (#18434)
d69826cc5dc2 is described below

commit d69826cc5dc263bce8d3edc8b69d4e7ceba3c045
Author: Shuo Cheng <[email protected]>
AuthorDate: Fri Apr 3 09:48:06 2026 +0800

    fix(flink): Trigger a failover after pending instants recommitted to b… 
(#18434)
    
    * fix(flink): Trigger a failover after pending instants recommitted to 
bootstrap RLI completely
    * fix comments
---
 .../apache/hudi/configuration/FlinkOptions.java    |  8 --
 .../hudi/sink/StreamWriteOperatorCoordinator.java  | 48 +++++++-----
 .../hudi/sink/bootstrap/RLIBootstrapOperator.java  | 89 +---------------------
 .../org/apache/hudi/sink/event/Correspondent.java  | 38 ---------
 .../org/apache/hudi/sink/utils/EventBuffers.java   | 15 +---
 .../java/org/apache/hudi/sink/utils/Pipelines.java |  7 +-
 .../sink/TestStreamWriteOperatorCoordinator.java   | 28 -------
 .../org/apache/hudi/sink/TestWriteMergeOnRead.java | 51 +++++++++++++
 .../hudi/sink/utils/MockStateSnapshotContext.java  | 41 ++++++++++
 .../sink/utils/StreamWriteFunctionWrapper.java     | 48 +++++++++++-
 .../apache/hudi/sink/utils/TestEventBuffers.java   | 42 ----------
 .../org/apache/hudi/sink/utils/TestWriteBase.java  | 13 ++++
 12 files changed, 180 insertions(+), 248 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index f2907b06d66b..cf382e748fe8 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -908,14 +908,6 @@ public class FlinkOptions extends HoodieConfig {
       .defaultValue(ClientIds.INIT_CLIENT_ID)
       .withDescription("Unique identifier used to distinguish different writer 
pipelines for concurrent mode");
 
-  // this is only for internal use
-  @AdvancedConfig
-  public static final ConfigOption<String> WRITE_OPERATOR_UID = ConfigOptions
-      .key("write.operator.uid")
-      .stringType()
-      .noDefaultValue()
-      .withDescription("The write operator uid used as the uid for hudi sink 
transformation.");
-
   // ------------------------------------------------------------------------
   //  Compaction Options
   // ------------------------------------------------------------------------
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index 1c1add8b721b..82fcb3ed5c57 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -327,6 +327,13 @@ public class StreamWriteOperatorCoordinator
     if (checkpointData != null) {
       initEventBufferIfNecessary();
       
this.eventBuffers.addEventsToBuffer(SerializationUtils.deserialize(checkpointData));
+      // resetToCheckpoint() is called in two cases:
+      // 1. The job is restarted from state, start() will be called later.
+      // 2. The job is recovered from global failover. The coordinator is 
already started, and start() will not be called again.
+      if (executor != null && tableState.isRLIWithBootstrap) {
+        // use sync execution here to make sure the recommitting finishes 
before RLI bootstrapping
+        this.executor.executeSync(this::restoreEvents, "Recommit pending 
instants on resetting to checkpoint: %s.", checkpointID);
+      }
     }
   }
 
@@ -364,8 +371,12 @@ public class StreamWriteOperatorCoordinator
   }
 
   @Override
-  public void subtaskReset(int i, long l) {
-    // no operation
+  public void subtaskReset(int i, long resetCkpId) {
+    // There exists pending instants waiting for recommiting.
+    if (tableState.isRLIWithBootstrap && 
!eventBuffers.getPendingInstantsBefore(resetCkpId).isEmpty()) {
+      // use sync execution here to make sure the recommitting finishes before 
RLI bootstrapping
+      executor.executeSync(() -> commitInstants(resetCkpId), "Recommit pending 
instants on resetting subtask %s to checkpoint: %s.", i, resetCkpId);
+    }
   }
 
   @VisibleForTesting
@@ -391,9 +402,6 @@ public class StreamWriteOperatorCoordinator
     if (request instanceof Correspondent.InflightInstantsRequest) {
       return 
handleInFlightInstantsRequest((Correspondent.InflightInstantsRequest) request);
     }
-    if (request instanceof Correspondent.AwaitPendingInstantsRequest) {
-      return 
handleAwaitPendingInstantsRequest((Correspondent.AwaitPendingInstantsRequest) 
request);
-    }
     throw new HoodieException("Unexpected coordination request type: " + 
request.getClass().getSimpleName());
   }
 
@@ -421,16 +429,6 @@ public class StreamWriteOperatorCoordinator
     return 
CompletableFuture.completedFuture(CoordinationResponseSerDe.wrap(coordinationResponse));
   }
 
-  private CompletableFuture<CoordinationResponse> 
handleAwaitPendingInstantsRequest(Correspondent.AwaitPendingInstantsRequest 
request) {
-    CompletableFuture<CoordinationResponse> response = new 
CompletableFuture<>();
-    instantRequestExecutor.execute(() -> {
-      // wait until receiving any bootstrap event.
-      eventBuffers.awaitPrevInstantsToComplete(request.getCheckpointId());
-      
response.complete(CoordinationResponseSerDe.wrap(Correspondent.AwaitPendingInstantsResponse.getInstance()));
-    }, "await pending instants to complete");
-    return response;
-  }
-
   // -------------------------------------------------------------------------
   //  Utilities
   // -------------------------------------------------------------------------
@@ -532,16 +530,16 @@ public class StreamWriteOperatorCoordinator
    * Recommits the last inflight instant if the write metadata checkpoint 
successfully
    * but was not committed due to some rare cases.
    */
-  private void recommitInstant(long checkpointId, String instant, EventBuffer 
bootstrapBuffer) {
+  private boolean recommitInstant(long checkpointId, String instant, 
EventBuffer bootstrapBuffer) {
     HoodieTimeline completedTimeline = 
this.metaClient.getActiveTimeline().filterCompletedInstants();
-    recommitInstant(completedTimeline, checkpointId, instant, bootstrapBuffer);
+    return recommitInstant(completedTimeline, checkpointId, instant, 
bootstrapBuffer);
   }
 
   /**
    * Recommits the last inflight instant if the write metadata checkpoint 
successfully
    * but was not committed due to some rare cases.
    */
-  private void recommitInstant(HoodieTimeline completedTimeline, long 
checkpointId, String instant, EventBuffer bootstrapBuffer) {
+  private boolean recommitInstant(HoodieTimeline completedTimeline, long 
checkpointId, String instant, EventBuffer bootstrapBuffer) {
     if (!completedTimeline.containsInstant(instant)) {
       log.info("Recommit instant {}", instant);
       // Recommit should start heartbeat for lazy failed writes clean policy 
to avoid aborting for heartbeat expired;
@@ -549,7 +547,11 @@ public class StreamWriteOperatorCoordinator
       if (writeClient.getConfig().getFailedWritesCleanPolicy().isLazy()) {
         writeClient.getHeartbeatClient().start(instant);
       }
-      commitInstant(checkpointId, instant, bootstrapBuffer);
+      return commitInstant(checkpointId, instant, bootstrapBuffer);
+    } else {
+      // clean the corresponding event buffer if the instant is already 
committed.
+      eventBuffers.reset(checkpointId);
+      return false;
     }
   }
 
@@ -562,7 +564,11 @@ public class StreamWriteOperatorCoordinator
     eventBuffer.addBootstrapEvent(event);
     if (eventBuffer.allBootstrapEventsReceived()) {
       // start to recommit the instant.
-      recommitInstant(event.getCheckpointId(), event.getInstantTime(), 
eventBuffer);
+      boolean committed = recommitInstant(event.getCheckpointId(), 
event.getInstantTime(), eventBuffer);
+      if (committed && tableState.isRLIWithBootstrap) {
+        context.failJob(new HoodieException("There are pending instants 
recommitted on job restart,"
+            + "triggering a global failover so that RLI bootstrap operator can 
load the record level index completely."));
+      }
     }
   }
 
@@ -753,6 +759,7 @@ public class StreamWriteOperatorCoordinator
     final boolean syncMetadata;
     final boolean isDeltaTimeCompaction;
     final boolean isStreamingIndexWriteEnabled;
+    final boolean isRLIWithBootstrap;
 
     private TableState(Configuration conf) {
       this.operationType = 
WriteOperationType.fromValue(conf.get(FlinkOptions.OPERATION));
@@ -766,6 +773,7 @@ public class StreamWriteOperatorCoordinator
       this.syncMetadata = conf.get(FlinkOptions.METADATA_ENABLED);
       this.isDeltaTimeCompaction = OptionsResolver.isDeltaTimeCompaction(conf);
       this.isStreamingIndexWriteEnabled = 
OptionsResolver.isStreamingIndexWriteEnabled(conf);
+      this.isRLIWithBootstrap = OptionsResolver.isRLIWithBootstrap(conf);
     }
 
     public static TableState create(Configuration conf) {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/RLIBootstrapOperator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/RLIBootstrapOperator.java
index 55b5d65cf232..515b180d3cad 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/RLIBootstrapOperator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/RLIBootstrapOperator.java
@@ -25,32 +25,19 @@ import 
org.apache.hudi.common.function.SerializableFunctionUnchecked;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.metadata.HoodieBackedTableMetadata;
-import org.apache.hudi.sink.event.Correspondent;
-import org.apache.hudi.sink.utils.OperatorIDGenerator;
 import org.apache.hudi.util.StreamerUtil;
 import org.apache.hudi.utils.RuntimeContextUtils;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.state.StateInitializationContext;
-import org.apache.flink.runtime.state.StateSnapshotContext;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.StreamTask;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.stream.Collectors;
-import java.util.stream.StreamSupport;
 
 /**
  * Bootstrap operator that loads record level index (RLI) data from metadata 
table.
@@ -65,60 +52,17 @@ import java.util.stream.StreamSupport;
 public class RLIBootstrapOperator
     extends AbstractBootstrapOperator {
 
-  private final OperatorID dataWriteOperatorId;
-
-  private transient HoodieTableMetaClient metaClient;
   private transient HoodieBackedTableMetadata metadataTable;
-  private transient Correspondent correspondent;
   private transient long loadedCnt;
 
-  /**
-   * The last checkpoint id, starts from -1.
-   */
-  private long checkpointId = -1;
-
-  /**
-   * List state of the JobID.
-   */
-  private transient ListState<JobID> jobIdState;
-
   public RLIBootstrapOperator(Configuration conf) {
     super(conf);
-    String writeOperatorUid = conf.get(FlinkOptions.WRITE_OPERATOR_UID);
-    ValidationUtils.checkArgument(writeOperatorUid != null,
-        "Write operator UID should not be null when index is Record Level 
Index.");
-    this.dataWriteOperatorId = OperatorIDGenerator.fromUid(writeOperatorUid);
-  }
-
-  @Override
-  public void setup(StreamTask<?, ?> containingTask, StreamConfig config, 
Output<StreamRecord<HoodieFlinkInternalRow>> output) {
-    super.setup(containingTask, config, output);
-    this.correspondent = Correspondent.getInstance(dataWriteOperatorId,
-        
getContainingTask().getEnvironment().getOperatorCoordinatorEventGateway());
   }
 
   @Override
   public void initializeState(StateInitializationContext context) throws 
Exception {
-    this.jobIdState = context.getOperatorStateStore().getListState(
-        new ListStateDescriptor<>(
-            "job-id-state",
-            TypeInformation.of(JobID.class)
-        ));
     loadedCnt = 0;
-
-    int attemptId = RuntimeContextUtils.getAttemptNumber(getRuntimeContext());
-    if (context.isRestored()) {
-      initCheckpointId(attemptId, 
context.getRestoredCheckpointId().orElse(-1L));
-    }
-
-    if (context.isRestored()) {
-      // Wait for pending instants being committed successfully before loading 
the record index
-      log.info("Waiting for pending instants committed before RLI bootstrap.");
-      correspondent.awaitPendingInstantsCommitted(checkpointId);
-      log.info("All pending instants are completed, continue RLI bootstrap.");
-    }
-
-    this.metaClient = StreamerUtil.createMetaClient(conf);
+    HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
     this.metadataTable = (HoodieBackedTableMetadata) 
metaClient.getTableFormat().getMetadataFactory().create(
         HoodieFlinkEngineContext.DEFAULT,
         metaClient.getStorage(),
@@ -128,15 +72,6 @@ public class RLIBootstrapOperator
     preLoadRLIRecords();
   }
 
-  @Override
-  public void snapshotState(StateSnapshotContext context) throws Exception {
-    super.snapshotState(context);
-    // Reload the job ID state
-    reloadJobIdState();
-    // Update checkpoint id
-    this.checkpointId = context.getCheckpointId();
-  }
-
   @Override
   public void close() throws Exception {
     closeMetadataTable();
@@ -147,28 +82,6 @@ public class RLIBootstrapOperator
   //  Utilities
   // -------------------------------------------------------------------------
 
-  /**
-   * Reload the job id state as the current job id.
-   */
-  private void reloadJobIdState() throws Exception {
-    this.jobIdState.clear();
-    this.jobIdState.add(RuntimeContextUtils.getJobId(getRuntimeContext()));
-  }
-
-  private void initCheckpointId(int attemptId, long restoredCheckpointId) 
throws Exception {
-    if (attemptId <= 0) {
-      // returns early if the job/task is initially started.
-      return;
-    }
-    JobID currentJobId = RuntimeContextUtils.getJobId(getRuntimeContext());
-    if (StreamSupport.stream(this.jobIdState.get().spliterator(), false)
-        .noneMatch(currentJobId::equals)) {
-      // do not set up the checkpoint id if the state comes from the old job.
-      return;
-    }
-    this.checkpointId = restoredCheckpointId;
-  }
-
   private void preLoadRLIRecords() {
     int taskID = 
RuntimeContextUtils.getIndexOfThisSubtask(getRuntimeContext());
     int parallelism = 
RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext());
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/event/Correspondent.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/event/Correspondent.java
index 5d599514bb0b..9bc8f8242ec5 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/event/Correspondent.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/event/Correspondent.java
@@ -100,18 +100,6 @@ public class Correspondent {
     }
   }
 
-  /**
-   * Requests coordinator to wait until all pending instants are committed if 
necessary.
-   */
-  public void awaitPendingInstantsCommitted(long checkpointId) {
-    try {
-      this.gateway.sendRequestToCoordinator(this.operatorID,
-          new 
SerializedValue<>(AwaitPendingInstantsRequest.getInstance(checkpointId))).get();
-    } catch (Exception e) {
-      throw new HoodieException("Error awaiting pending instants completion 
from coordinator", e);
-    }
-  }
-
   /**
    * A request for instant time with a given checkpoint id.
    */
@@ -165,30 +153,4 @@ public class Correspondent {
       return new InflightInstantsResponse(inflightInstants);
     }
   }
-
-  /**
-   * A request to wait until all pending instants are committed in the 
coordinator.
-   */
-  @AllArgsConstructor(access = AccessLevel.PRIVATE)
-  @Getter
-  public static class AwaitPendingInstantsRequest implements 
CoordinationRequest {
-
-    private final long checkpointId;
-
-    public static AwaitPendingInstantsRequest getInstance(long checkpointId) {
-      return new AwaitPendingInstantsRequest(checkpointId);
-    }
-  }
-
-  /**
-   * A response to indicate pending instants are all completed.
-   */
-  @AllArgsConstructor(access = AccessLevel.PRIVATE)
-  @Getter
-  public static class AwaitPendingInstantsResponse implements 
CoordinationResponse {
-
-    public static AwaitPendingInstantsResponse getInstance() {
-      return new AwaitPendingInstantsResponse();
-    }
-  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/EventBuffers.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/EventBuffers.java
index 060a74198f5f..87e1010e2227 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/EventBuffers.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/EventBuffers.java
@@ -49,30 +49,25 @@ public class EventBuffers implements Serializable {
   // {checkpointId -> (instant, data write events, index write events)}
   private final Map<Long, Pair<String, EventBuffer>> eventBuffers;
   private final Option<CommitGuard> commitGuardOption;
-  private final Option<CommitGuard> indexBootstrapGuardOption;
   private final int dataWriteParallelism;
   private final int indexWriteParallelism;
 
   private EventBuffers(
       Map<Long, Pair<String, EventBuffer>> eventBuffers,
       Option<CommitGuard> commitGuardOption,
-      Option<CommitGuard> indexBootstrapGuardOption,
       int dataWriteParallelism,
       int indexWriteParallelism) {
     this.eventBuffers = eventBuffers;
     this.commitGuardOption = commitGuardOption;
     this.dataWriteParallelism = dataWriteParallelism;
     this.indexWriteParallelism = indexWriteParallelism;
-    this.indexBootstrapGuardOption = indexBootstrapGuardOption;
   }
 
   public static EventBuffers getInstance(Configuration conf, int 
dataWriteParallelism) {
     final Option<CommitGuard> commitGuardOpt = 
OptionsResolver.isBlockingInstantGeneration(conf)
         ? 
Option.of(CommitGuard.create(conf.get(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT))) 
: Option.empty();
-    final Option<CommitGuard> indexBootstrapGuardOption = 
OptionsResolver.isRLIWithBootstrap(conf)
-        ? 
Option.of(CommitGuard.create(conf.get(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT))) 
: Option.empty();
     final int indexWriteParallelism = 
OptionsResolver.indexWriteParallelism(conf);
-    return new EventBuffers(new ConcurrentSkipListMap<>(), commitGuardOpt, 
indexBootstrapGuardOption, dataWriteParallelism, indexWriteParallelism);
+    return new EventBuffers(new ConcurrentSkipListMap<>(), commitGuardOpt, 
dataWriteParallelism, indexWriteParallelism);
   }
 
   public EventBuffer addEventToBuffer(WriteMetadataEvent event) {
@@ -148,17 +143,9 @@ public class EventBuffers implements Serializable {
     }
   }
 
-  public void awaitPrevInstantsToComplete(long checkpointId) {
-    List<String> pendingInstants = getPendingInstantsBefore(checkpointId);
-    if (!pendingInstants.isEmpty() && 
this.indexBootstrapGuardOption.isPresent()) {
-      this.indexBootstrapGuardOption.get().blockFor(() -> 
getPendingInstantsBefore(checkpointId));
-    }
-  }
-
   public void reset(long checkpointId) {
     this.eventBuffers.remove(checkpointId);
     this.commitGuardOption.ifPresent(CommitGuard::unblock);
-    this.indexBootstrapGuardOption.ifPresent(CommitGuard::unblock);
   }
 
   public boolean nonEmpty() {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
index 09c6e4dec448..47d1f8e9e716 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
@@ -286,9 +286,6 @@ public class Pipelines {
 
     if (conf.get(FlinkOptions.INDEX_BOOTSTRAP_ENABLED) || bounded) {
       boolean isRliBootstrap = OptionsResolver.isRecordLevelIndex(conf);
-      if (isRliBootstrap) {
-        conf.set(FlinkOptions.WRITE_OPERATOR_UID, 
Pipelines.opUID("stream_write", conf));
-      }
       dataStream1 = dataStream1
           .transform(
               "index_bootstrap",
@@ -407,9 +404,7 @@ public class Pipelines {
           throw new HoodieNotSupportedException("Unknown bucket index engine 
type: " + bucketIndexEngineType);
       }
     } else {
-      // if the index is RLI, the write operator UID will be pre-generated and 
set into the configuration.
-      String writeOperatorUid = conf.get(FlinkOptions.WRITE_OPERATOR_UID);
-      writeOperatorUid = writeOperatorUid == null ? opUID("stream_write", 
conf) : writeOperatorUid;
+      String writeOperatorUid = opUID("stream_write", conf);
       // uuid is used to generate operator id for the write operator, then the 
bucket assign operator can send
       // operator event to the coordinator of the write operator based on the 
operator id.
       // @see org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway.
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
index 0d6a375bf7a6..a5ccb2ffb20f 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
@@ -38,7 +38,6 @@ import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.HadoopConfigurations;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
-import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.sink.muttley.AthenaIngestionGateway;
 import org.apache.hudi.sink.event.Correspondent;
@@ -672,33 +671,6 @@ public class TestStreamWriteOperatorCoordinator {
     assertEquals(instant2, inflightInstants.get(2L));
   }
 
-  @Test
-  void testHandleAwaitPendingInstantsRequest() throws Exception {
-    Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
-    conf.set(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ.name());
-    conf.set(FlinkOptions.INDEX_TYPE, 
HoodieIndex.IndexType.GLOBAL_RECORD_LEVEL_INDEX.name());
-    conf.set(FlinkOptions.INDEX_BOOTSTRAP_ENABLED, true);
-    conf.set(FlinkOptions.INDEX_WRITE_TASKS, 4);
-    coordinator = createCoordinator(conf, 1);
-
-    Thread t = new Thread(() -> {
-      try {
-        CompletableFuture<CoordinationResponse> responseFuture =
-            
coordinator.handleCoordinationRequest(Correspondent.AwaitPendingInstantsRequest.getInstance(1));
-        Correspondent.AwaitPendingInstantsResponse response =
-            CoordinationResponseSerDe.unwrap(responseFuture.get());
-        assertNotNull(response);
-      } catch (Exception e) {
-        throw new HoodieException(e);
-      }
-    });
-    t.start();
-    // send a bootstrap event to unblock the simulated request from bootstrap 
operator.
-    WriteMetadataEvent event1 = createBootstrapEvent(0, 0, 
coordinator.getInstant(), "par1");
-    coordinator.handleEventFromOperator(0, event1);
-    t.join();
-  }
-
   // -------------------------------------------------------------------------
   //  Utilities
   // -------------------------------------------------------------------------
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
index c753ba4e6dc1..5e4deee54c96 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
@@ -18,9 +18,11 @@
 
 package org.apache.hudi.sink;
 
+import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.model.EventTimeAvroPayload;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -29,6 +31,7 @@ import org.apache.hudi.config.HoodieClusteringConfig;
 import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.util.StreamerUtil;
 import org.apache.hudi.utils.TestData;
 
@@ -266,6 +269,54 @@ public class TestWriteMergeOnRead extends 
TestWriteCopyOnWrite {
         .end();
   }
 
+  @Test
+  public void testRecommitOnJobRestartTriggeringGlobalFailover() throws 
Exception {
+    conf.set(FlinkOptions.INDEX_TYPE, 
HoodieIndex.IndexType.GLOBAL_RECORD_LEVEL_INDEX.name());
+    
conf.setString(HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_ENABLE_PROP.key(),
 "true");
+    conf.setString(HoodieMetadataConfig.STREAMING_WRITE_ENABLED.key(), "true");
+    conf.set(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, 10_000L);
+    conf.set(FlinkOptions.INDEX_BOOTSTRAP_ENABLED, true);
+
+    Map<String, String> expected = new HashMap<>();
+    expected.put("par1", "[id1,par1,id1,Danny,23,1,par1]");
+    expected.put("par2", "[id3,par2,id3,Julian,53,3,par2, 
id4,par2,id4,Fabian,31,4,par2]");
+
+    TestHarness testHarness = preparePipeline(conf)
+        .consume(TestData.DATA_SET_PART1)
+        .emptyEventBuffer()
+        .checkpoint(1)
+        .assertNextEvent(1, "par1")
+        .consume(TestData.DATA_SET_PART3)
+        .checkpoint(2)
+        // both ckp-1 and ckp-2 are not committing
+        .assertNextEvent(1, "par2")
+        // then simulating restarting job manually, coordinator will reset to 
ckp-2
+        // and recommit write metadata for ckp-1
+        .restartCoordinator()
+        .subTaskFails(0, 0);
+
+    // global failover is not triggered now, since write metadata events from 
writer state are not recommitted.
+    testHarness.assertGlobalFailure(false);
+
+    testHarness.assertNextEvent();
+    // global failover is triggered now, since all the pending write metadata 
events are recommitted.
+    testHarness.assertGlobalFailure(true);
+
+    testHarness.subTaskFails(0, 0)
+        .checkIndexLoaded(
+            new HoodieKey("id1", "par1"),
+            new HoodieKey("id3", "par2"))
+        // insert another batch of data.
+        .consume(TestData.DATA_SET_PART4)
+        .checkpoint(3)
+        .assertNextEvent(1, "par2")
+        // write metadata will be committed for ckp-3
+        .checkpointComplete(3)
+        // there should be 3 rows and 2 partitions
+        .checkWrittenData(expected, 2)
+        .end();
+  }
+
   @Test
   public void testInsertDuplicateRecordsWithCDCMode() throws Exception {
     conf.set(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, 10_000L);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockStateSnapshotContext.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockStateSnapshotContext.java
new file mode 100644
index 000000000000..b9deee81adf0
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockStateSnapshotContext.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.utils;
+
+import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
+import org.apache.flink.runtime.state.OperatorStateCheckpointOutputStream;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import 
org.apache.flink.streaming.api.operators.collect.utils.MockFunctionSnapshotContext;
+
+/** A {@link StateSnapshotContext} for testing purpose. */
+public class MockStateSnapshotContext extends MockFunctionSnapshotContext 
implements StateSnapshotContext {
+  public MockStateSnapshotContext(long checkpointId) {
+    super(checkpointId);
+  }
+
+  @Override
+  public KeyedStateCheckpointOutputStream getRawKeyedOperatorStateOutput() 
throws Exception {
+    throw new UnsupportedOperationException("Unsupported now.");
+  }
+
+  @Override
+  public OperatorStateCheckpointOutputStream getRawOperatorStateOutput() 
throws Exception {
+    throw new UnsupportedOperationException("Unsupported now.");
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
index 254592556669..dac07150295e 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
@@ -28,8 +28,10 @@ import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.sink.StreamWriteFunction;
 import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
+import org.apache.hudi.sink.bootstrap.AbstractBootstrapOperator;
 import org.apache.hudi.sink.bootstrap.BootstrapOperator;
 import org.apache.hudi.sink.buffer.MemorySegmentPoolFactory;
+import org.apache.hudi.sink.bootstrap.RLIBootstrapOperator;
 import org.apache.hudi.sink.common.AbstractWriteFunction;
 import org.apache.hudi.sink.event.WriteMetadataEvent;
 import org.apache.hudi.sink.partitioner.BucketAssignFunction;
@@ -59,6 +61,7 @@ import org.apache.flink.streaming.util.MockStreamTaskBuilder;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.Collector;
+import org.mockito.Mockito;
 
 import java.util.HashSet;
 import java.util.List;
@@ -66,7 +69,9 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -99,7 +104,7 @@ public class StreamWriteFunctionWrapper<I> implements 
TestFunctionWrapper<I> {
   /**
    * Function that load index in state.
    */
-  private BootstrapOperator bootstrapOperator;
+  private AbstractBootstrapOperator bootstrapOperator;
   /**
    * Function that assigns bucket ID.
    */
@@ -179,9 +184,12 @@ public class StreamWriteFunctionWrapper<I> implements 
TestFunctionWrapper<I> {
     bucketAssignerFunction.initializeState(this.stateInitializationContext);
 
     if (conf.get(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
-      bootstrapOperator = new BootstrapOperator(conf);
+      bootstrapOperator = isStreamingWriteIndexEnabled ? Mockito.spy(new 
RLIBootstrapOperator(conf)) : new BootstrapOperator(conf);
       CollectOutputAdapter<HoodieFlinkInternalRow> output = new 
CollectOutputAdapter<>();
       bootstrapOperator.setup(streamTask, streamConfig, output);
+      if (isStreamingWriteIndexEnabled) {
+        doReturn(runtimeContext).when(bootstrapOperator).getRuntimeContext();
+      }
       bootstrapOperator.initializeState(this.stateInitializationContext);
 
       Collector<HoodieFlinkInternalRow> collector = 
RecordsCollector.getInstance(rowType);
@@ -258,7 +266,7 @@ public class StreamWriteFunctionWrapper<I> implements 
TestFunctionWrapper<I> {
     // checkpoint the coordinator first
     checkpointCoordinator(checkpointId);
     if (conf.get(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
-      bootstrapOperator.snapshotState(null);
+      bootstrapOperator.snapshotState(new 
MockStateSnapshotContext(checkpointId));
     }
     bucketAssignerFunction.snapshotState(new 
MockFunctionSnapshotContext(checkpointId));
 
@@ -346,8 +354,12 @@ public class StreamWriteFunctionWrapper<I> implements 
TestFunctionWrapper<I> {
     coordinator.subtaskFailed(taskID, new RuntimeException("Dummy exception"));
     // reset the attempt number to simulate the task failover/retries
     this.runtimeContext.setAttemptNumber(attemptNumber);
+    this.bucketAssignFunctionContext.clear();
     setupWriteFunction();
     if (supportStreamingWriteIndex()) {
+      if (conf.get(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
+        setupIndexBootstrapFunction();
+      }
       setupIndexWriteFunction();
     }
   }
@@ -384,7 +396,10 @@ public class StreamWriteFunctionWrapper<I> implements 
TestFunctionWrapper<I> {
   }
 
   public boolean isAlreadyBootstrap() throws Exception {
-    return this.bootstrapOperator.isAlreadyBootstrap();
+    if (this.bootstrapOperator instanceof BootstrapOperator) {
+      return ((BootstrapOperator) this.bootstrapOperator).isAlreadyBootstrap();
+    }
+    return this.bootstrapOperator != null;
   }
 
   // -------------------------------------------------------------------------
@@ -410,6 +425,27 @@ public class StreamWriteFunctionWrapper<I> implements 
TestFunctionWrapper<I> {
     indexWriteFunction.open(conf);
   }
 
+  private void setupIndexBootstrapFunction() {
+    bootstrapOperator = Mockito.spy(new RLIBootstrapOperator(conf));
+    CollectOutputAdapter<HoodieFlinkInternalRow> output = new 
CollectOutputAdapter<>();
+    bootstrapOperator.setup(streamTask, streamConfig, output);
+    doReturn(runtimeContext).when(bootstrapOperator).getRuntimeContext();
+    try {
+      // may be blocked on pending instants committing in coordinator.
+      bootstrapOperator.initializeState(this.stateInitializationContext);
+
+      Collector<HoodieFlinkInternalRow> collector = 
RecordsCollector.getInstance(rowType);
+      for (HoodieFlinkInternalRow bootstrapRecord : output.getRecords()) {
+        
stateInitializationContext.getKeyedStateStore().setCurrentKey(bootstrapRecord.getRecordKey());
+        
when(context.getCurrentKey()).thenReturn(bootstrapRecord.getRecordKey());
+        bucketAssignerFunction.processElement(bootstrapRecord, context, 
collector);
+        
bucketAssignFunctionContext.setCurrentKey(bootstrapRecord.getRecordKey());
+      }
+    } catch (Exception e) {
+      throw new CompletionException(e);
+    }
+  }
+
   // -------------------------------------------------------------------------
   //  Inner Class
   // -------------------------------------------------------------------------
@@ -424,5 +460,9 @@ public class StreamWriteFunctionWrapper<I> implements 
TestFunctionWrapper<I> {
     public boolean isKeyInState(String key) {
       return this.updateKeys.contains(key);
     }
+
+    public void clear() {
+      this.updateKeys.clear();
+    }
   }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestEventBuffers.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestEventBuffers.java
index 40fe4e58edd0..c2c7f7d338f3 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestEventBuffers.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestEventBuffers.java
@@ -19,7 +19,6 @@
 package org.apache.hudi.sink.utils;
 
 import org.apache.hudi.configuration.FlinkOptions;
-import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.sink.event.WriteMetadataEvent;
 
 import org.apache.flink.configuration.Configuration;
@@ -29,7 +28,6 @@ import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
 
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -69,46 +67,6 @@ public class TestEventBuffers {
     }
   }
 
-  @Test
-  void testAwaitPrevInstantsWaitsUntilAllPreviousCheckpointsReset() throws 
Exception {
-    Configuration conf = new Configuration();
-    conf.set(FlinkOptions.INDEX_TYPE, 
HoodieIndex.IndexType.GLOBAL_RECORD_LEVEL_INDEX.name());
-    conf.set(FlinkOptions.INDEX_BOOTSTRAP_ENABLED, true);
-    conf.set(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, 5_000L);
-    conf.set(FlinkOptions.INDEX_WRITE_TASKS, 4);
-    EventBuffers eventBuffers = EventBuffers.getInstance(conf, 1);
-
-    eventBuffers.initNewEventBuffer(1L, "001");
-    eventBuffers.initNewEventBuffer(2L, "002");
-    eventBuffers.initNewEventBuffer(3L, "003");
-
-    ExecutorService executor = Executors.newSingleThreadExecutor();
-    try {
-      // two tasks block waiting on commit.
-      CompletableFuture<Void> waitingFuture1 = CompletableFuture.runAsync(
-          () -> eventBuffers.awaitPrevInstantsToComplete(3L), executor);
-      CompletableFuture<Void> waitingFuture2 = CompletableFuture.runAsync(
-          () -> eventBuffers.awaitPrevInstantsToComplete(3L), executor);
-
-      Thread.sleep(100);
-      assertFalse(waitingFuture1.isDone());
-      assertFalse(waitingFuture2.isDone());
-
-      eventBuffers.reset(1L);
-      Thread.sleep(100);
-      assertFalse(waitingFuture1.isDone(), "Checkpoint 2 still pending, should 
continue blocking");
-      assertFalse(waitingFuture2.isDone(), "Checkpoint 2 still pending, should 
continue blocking");
-
-      eventBuffers.reset(2L);
-      waitingFuture1.get(2, TimeUnit.SECONDS);
-      assertTrue(waitingFuture1.isDone());
-      waitingFuture2.get(2, TimeUnit.SECONDS);
-      assertTrue(waitingFuture2.isDone());
-    } finally {
-      executor.shutdownNow();
-    }
-  }
-
   private static WriteMetadataEvent newWriteEvent(long checkpointId, String 
instant) {
     return WriteMetadataEvent.builder()
         .taskID(0)
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
index c7ca2ee1b485..b4a265e2b252 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
@@ -483,6 +483,11 @@ public class TestWriteBase {
       return this;
     }
 
+    public TestHarness assertGlobalFailure(boolean failed) {
+      assertEquals(failed, 
this.pipeline.getCoordinatorContext().isJobFailed());
+      return this;
+    }
+
     public TestHarness checkpointThrows(long checkpointId, String message) {
       // this returns early because there is no inflight instant
       assertThrows(HoodieException.class, () -> checkpoint(checkpointId), 
message);
@@ -616,6 +621,14 @@ public class TestWriteBase {
       return this;
     }
 
+    public TestHarness checkIndexNotLoaded(HoodieKey... keys) {
+      for (HoodieKey key : keys) {
+        assertFalse(this.pipeline.isKeyInState(key),
+            "Key: " + key + " assumes to not in the index state");
+      }
+      return this;
+    }
+
     public TestHarness assertBootstrapped() throws Exception {
       assertTrue(this.pipeline.isAlreadyBootstrap());
       return this;


Reply via email to