This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 8f23fc298773 feat: Support bucket assgin operator fetching inflight 
instants from coordinator (#17885)
8f23fc298773 is described below

commit 8f23fc298773f3beb67edda66757ab81698ccf8f
Author: Shuo Cheng <[email protected]>
AuthorDate: Mon Jan 19 12:48:26 2026 +0800

    feat: Support bucket assgin operator fetching inflight instants from 
coordinator (#17885)
---
 .../apache/hudi/configuration/FlinkOptions.java    |  3 +-
 .../apache/hudi/configuration/OptionsResolver.java |  2 +-
 .../hudi/sink/StreamWriteOperatorCoordinator.java  | 20 +++++-
 .../org/apache/hudi/sink/event/Correspondent.java  | 42 +++++++++++++
 .../sink/partitioner/BucketAssignFunction.java     | 11 +++-
 .../partitioner/MiniBatchBucketAssignOperator.java | 25 +++++++-
 .../partitioner/MinibatchBucketAssignFunction.java | 11 +++-
 .../hudi/sink/partitioner/index/IndexBackend.java  |  7 ++-
 .../sink/partitioner/index/RecordIndexCache.java   | 10 ++-
 .../partitioner/index/RecordLevelIndexBackend.java |  6 +-
 .../org/apache/hudi/sink/utils/EventBuffers.java   |  7 +++
 .../hudi/sink/utils/OperatorIDGenerator.java       | 49 +++++++++++++++
 .../java/org/apache/hudi/sink/utils/Pipelines.java | 12 ++--
 .../sink/TestStreamWriteOperatorCoordinator.java   | 41 ++++++++++++
 .../org/apache/hudi/sink/TestWriteCopyOnWrite.java | 19 ++++++
 .../org/apache/hudi/sink/TestWriteMergeOnRead.java |  5 ++
 .../hudi/sink/TestWriteMergeOnReadWithCompact.java |  5 ++
 .../TestMinibatchBucketAssignFunction.java         | 72 ++++++++++++++--------
 .../index/TestRecordLevelIndexBackend.java         | 10 ++-
 .../apache/hudi/sink/utils/MockCorrespondent.java  | 12 ++++
 .../sink/utils/StreamWriteFunctionWrapper.java     | 11 +++-
 .../hudi/sink/utils/TestFunctionWrapper.java       |  8 +++
 .../org/apache/hudi/sink/utils/TestWriteBase.java  | 13 ++++
 .../apache/hudi/table/ITTestHoodieDataSource.java  |  9 ++-
 24 files changed, 357 insertions(+), 53 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 195db61e5d81..773c8721900f 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -290,7 +290,8 @@ public class FlinkOptions extends HoodieConfig {
       .withDescription("The maximum number of input records can be buffered 
for miniBatch during record index lookup.\n"
           + "MiniBatch is an optimization to buffer input records to reduce 
the number of individual index lookups,\n"
           + "which can significantly improve performance compared to 
processing each record individually.\n"
-          + "Set to 0 to disable mini-batch processing.");
+          + "Default value is 1000, which is also the minimum value for the 
minibatch size, when the configured size\n"
+          + "is less than 1000, the default value will be used.");
 
   // ------------------------------------------------------------------------
   //  Read Options
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
index ff9b519b60d3..6f083daac07a 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
@@ -187,7 +187,7 @@ public class OptionsResolver {
    */
   public static boolean isMiniBatchBucketAssign(Configuration conf) {
     HoodieIndex.IndexType indexType = OptionsResolver.getIndexType(conf);
-    return indexType == HoodieIndex.IndexType.GLOBAL_RECORD_LEVEL_INDEX && 
conf.get(FlinkOptions.INDEX_RLI_LOOKUP_MINIBATCH_SIZE) > 0;
+    return indexType == HoodieIndex.IndexType.GLOBAL_RECORD_LEVEL_INDEX;
   }
 
   /**
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index 2e8b9f8a97df..ced38c436cc8 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -171,7 +171,7 @@ public class StreamWriteOperatorCoordinator
   protected NonThrownExecutor executor;
 
   /**
-   * A single-thread executor to handle the instant time request.
+   * A single-thread executor to handle the coordination request from 
operators.
    */
   protected NonThrownExecutor instantRequestExecutor;
 
@@ -372,10 +372,19 @@ public class StreamWriteOperatorCoordinator
 
   @Override
   public CompletableFuture<CoordinationResponse> 
handleCoordinationRequest(CoordinationRequest request) {
+    if (request instanceof Correspondent.InstantTimeRequest) {
+      return handleInstantRequest((Correspondent.InstantTimeRequest) request);
+    }
+    if (request instanceof Correspondent.InflightInstantsRequest) {
+      return 
handleInFlightInstantsRequest((Correspondent.InflightInstantsRequest) request);
+    }
+    throw new HoodieException("Unexpected coordination request type: " + 
request.getClass().getSimpleName());
+  }
+
+  private CompletableFuture<CoordinationResponse> 
handleInstantRequest(Correspondent.InstantTimeRequest request) {
     CompletableFuture<CoordinationResponse> response = new 
CompletableFuture<>();
     instantRequestExecutor.execute(() -> {
-      Correspondent.InstantTimeRequest instantTimeRequest = 
(Correspondent.InstantTimeRequest) request;
-      long checkpointId = instantTimeRequest.getCheckpointId();
+      long checkpointId = request.getCheckpointId();
       Pair<String, WriteMetadataEvent[]> instantTimeAndEventBuffer = 
this.eventBuffers.getInstantAndEventBuffer(checkpointId);
       final String instantTime;
       if (instantTimeAndEventBuffer == null) {
@@ -391,6 +400,11 @@ public class StreamWriteOperatorCoordinator
     return response;
   }
 
+  private CompletableFuture<CoordinationResponse> 
handleInFlightInstantsRequest(Correspondent.InflightInstantsRequest request) {
+    CoordinationResponse coordinationResponse = 
Correspondent.InflightInstantsResponse.getInstance(eventBuffers.getAllCheckpointIdAndInstants());
+    return 
CompletableFuture.completedFuture(CoordinationResponseSerDe.wrap(coordinationResponse));
+  }
+
   // -------------------------------------------------------------------------
   //  Utilities
   // -------------------------------------------------------------------------
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/event/Correspondent.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/event/Correspondent.java
index 5176c5ec3e30..ea5fd557c8ee 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/event/Correspondent.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/event/Correspondent.java
@@ -31,6 +31,9 @@ import 
org.apache.flink.runtime.operators.coordination.CoordinationRequest;
 import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
 import org.apache.flink.util.SerializedValue;
 
+import java.util.HashMap;
+import java.util.Map;
+
 /**
  * Correspondent between a write task with the coordinator.
  */
@@ -72,6 +75,19 @@ public class Correspondent {
     }
   }
 
+  /**
+   * Sends a request to the coordinator to fetch the inflight instants.
+   */
+  public Map<Long, String> requestInflightInstants() {
+    try {
+      InflightInstantsResponse response = 
CoordinationResponseSerDe.unwrap(this.gateway.sendRequestToCoordinator(this.operatorID,
+          new SerializedValue<>(InflightInstantsRequest.getInstance())).get());
+      return response.getInflightInstants();
+    } catch (Exception e) {
+      throw new HoodieException("Error requesting the instant time from the 
coordinator", e);
+    }
+  }
+
   /**
    * A request for instant time with a given checkpoint id.
    */
@@ -99,4 +115,30 @@ public class Correspondent {
       return new InstantTimeResponse(instant);
     }
   }
+
+  /**
+   * A request for the current inflight instants in the coordinator.
+   */
+  @AllArgsConstructor(access = AccessLevel.PRIVATE)
+  @Getter
+  public static class InflightInstantsRequest implements CoordinationRequest {
+
+    public static InflightInstantsRequest getInstance() {
+      return new InflightInstantsRequest();
+    }
+  }
+
+  /**
+   * A response with instant time.
+   */
+  @AllArgsConstructor(access = AccessLevel.PRIVATE)
+  @Getter
+  public static class InflightInstantsResponse implements CoordinationResponse 
{
+
+    private final HashMap<Long, String> inflightInstants;
+
+    public static InflightInstantsResponse getInstance(HashMap<Long, String> 
inflightInstants) {
+      return new InflightInstantsResponse(inflightInstants);
+    }
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
index 7a79526f9d49..e156ec7d5df2 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
@@ -30,6 +30,7 @@ import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.HadoopConfigurations;
 import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.sink.event.Correspondent;
 import org.apache.hudi.sink.partitioner.index.IndexBackend;
 import org.apache.hudi.sink.partitioner.index.IndexBackendFactory;
 import org.apache.hudi.table.action.commit.BucketInfo;
@@ -38,6 +39,7 @@ import org.apache.hudi.util.FlinkWriteClients;
 import org.apache.hudi.utils.RuntimeContextUtils;
 
 import lombok.Getter;
+import lombok.Setter;
 import org.apache.flink.api.common.state.CheckpointListener;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
@@ -91,6 +93,12 @@ public class BucketAssignFunction
 
   private final boolean isChangingRecords;
 
+  /**
+   * Correspondent to fetch the infight instants from coordinator.
+   */
+  @Setter
+  protected transient Correspondent correspondent;
+
   /**
    * If the index is global, update the index for the old partition path
    * if same key record with different partition path came in.
@@ -212,8 +220,7 @@ public class BucketAssignFunction
   public void notifyCheckpointComplete(long checkpointId) {
     // Refresh the table state when there are new commits.
     this.bucketAssigner.reload(checkpointId);
-    // todo #17700: check the file based mapping between checkpoint id and 
instant to get the latest successful instant.
-    this.indexBackend.onCommitSuccess(checkpointId - 1);
+    this.indexBackend.onCheckpointComplete(this.correspondent);
   }
 
   @Override
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/MiniBatchBucketAssignOperator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/MiniBatchBucketAssignOperator.java
index e16cbb024788..1f97c10e0587 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/MiniBatchBucketAssignOperator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/MiniBatchBucketAssignOperator.java
@@ -19,9 +19,15 @@
 package org.apache.hudi.sink.partitioner;
 
 import org.apache.hudi.client.model.HoodieFlinkInternalRow;
+import org.apache.hudi.sink.event.Correspondent;
 
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.ProcessOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
 
 /**
  * An operator that performs mini-batch bucket assignment for incoming records.
@@ -36,17 +42,32 @@ import 
org.apache.flink.streaming.api.operators.ProcessOperator;
  */
 public class MiniBatchBucketAssignOperator extends 
ProcessOperator<HoodieFlinkInternalRow, HoodieFlinkInternalRow> implements 
BoundedOneInput {
 
-  /** The underlying function that performs the actual bucket assignment 
logic. */
+  /**
+   * The underlying function that performs the actual bucket assignment logic.
+   */
   private final MinibatchBucketAssignFunction bucketAssignFunction;
 
+  /**
+   * OperatorId for the data write operator.
+   */
+  private final OperatorID dataWriteOperatorId;
+
   /**
    * Constructs a MiniBatchBucketAssignOperator with the specified bucket 
assignment function.
    *
    * @param bucketAssignFunction the function responsible for performing the 
bucket assignment logic
    */
-  public MiniBatchBucketAssignOperator(MinibatchBucketAssignFunction 
bucketAssignFunction) {
+  public MiniBatchBucketAssignOperator(MinibatchBucketAssignFunction 
bucketAssignFunction, OperatorID dataWriteOperatorId) {
     super(bucketAssignFunction);
     this.bucketAssignFunction = bucketAssignFunction;
+    this.dataWriteOperatorId = dataWriteOperatorId;
+  }
+
+  @Override
+  public void setup(StreamTask<?, ?> containingTask, StreamConfig config, 
Output<StreamRecord<HoodieFlinkInternalRow>> output) {
+    super.setup(containingTask, config, output);
+    
this.bucketAssignFunction.setCorrespondent(Correspondent.getInstance(dataWriteOperatorId,
+        
getContainingTask().getEnvironment().getOperatorCoordinatorEventGateway()));
   }
 
   /**
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/MinibatchBucketAssignFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/MinibatchBucketAssignFunction.java
index c40e5a43b101..70b09621aa4d 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/MinibatchBucketAssignFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/MinibatchBucketAssignFunction.java
@@ -21,9 +21,12 @@ package org.apache.hudi.sink.partitioner;
 import org.apache.hudi.adapter.ProcessFunctionAdapter;
 import org.apache.hudi.client.model.HoodieFlinkInternalRow;
 import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.util.VisibleForTesting;
 import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.sink.event.Correspondent;
 import org.apache.hudi.sink.partitioner.index.MinibatchIndexBackend;
 
+import lombok.Getter;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.state.CheckpointListener;
 import org.apache.flink.configuration.Configuration;
@@ -60,6 +63,8 @@ public class MinibatchBucketAssignFunction
   /**
    * The maximum number of input records can be buffered for MiniBatch.
    */
+  @VisibleForTesting
+  @Getter
   private final int miniBatchSize;
 
   /**
@@ -75,7 +80,7 @@ public class MinibatchBucketAssignFunction
     this.delegateFunction = new BucketAssignFunction(conf);
     this.isChangingRecords = WriteOperationType.isChangingRecords(
         WriteOperationType.fromValue(conf.get(FlinkOptions.OPERATION)));
-    this.miniBatchSize = 
conf.get(FlinkOptions.INDEX_RLI_LOOKUP_MINIBATCH_SIZE);
+    this.miniBatchSize = 
Math.max(conf.get(FlinkOptions.INDEX_RLI_LOOKUP_MINIBATCH_SIZE), 
FlinkOptions.INDEX_RLI_LOOKUP_MINIBATCH_SIZE.defaultValue());
   }
 
   @Override
@@ -147,6 +152,10 @@ public class MinibatchBucketAssignFunction
     processBufferedRecords(outCollector);
   }
 
+  public void setCorrespondent(Correspondent correspondent) {
+    this.delegateFunction.setCorrespondent(correspondent);
+  }
+
   @Override
   public void notifyCheckpointComplete(long checkpointId) throws Exception {
     // Refresh the table state when there are new commits.
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexBackend.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexBackend.java
index e89bb4fe9ebe..74d98c46ab14 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexBackend.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexBackend.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.sink.partitioner.index;
 
 import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
+import org.apache.hudi.sink.event.Correspondent;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -55,11 +56,11 @@ public interface IndexBackend extends Closeable {
   }
 
   /**
-   * Listener method called when the instant associated with {@code 
checkpointId} is committed successfully.
+   * Listener method called when the bucket assign operator receives a notify 
checkpoint complete event.
    *
-   * @param checkpointId checkpoint id.
+   * @param correspondent The Correspondent used to get inflight instants from 
the coordinator.
    */
-  default void onCommitSuccess(long checkpointId) {
+  default void onCheckpointComplete(Correspondent correspondent) {
     // do nothing.
   }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RecordIndexCache.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RecordIndexCache.java
index 6ee3084f5460..267025ba4519 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RecordIndexCache.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RecordIndexCache.java
@@ -34,6 +34,7 @@ import org.apache.flink.configuration.Configuration;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.NavigableMap;
 import java.util.TreeMap;
@@ -117,8 +118,15 @@ public class RecordIndexCache implements Closeable {
    * @param checkpointId the id of checkpoint
    */
   public void clean(long checkpointId) {
+    NavigableMap<Long, ExternalSpillableMap<String, 
HoodieRecordGlobalLocation>> subMap;
+    if (checkpointId == Long.MAX_VALUE) {
+      // clean all the cache entries for old checkpoint ids, and only keeps 
the cache for the maximum checkpoint id,
+      // which aims to clear memory while also ensuring a certain cache hit 
rate
+      subMap = caches.firstEntry() == null ? Collections.emptyNavigableMap() : 
caches.tailMap(caches.firstKey(), false);
+    } else {
+      subMap = caches.tailMap(checkpointId, false);
+    }
     // Get all entries that are less than or equal to the given checkpointId
-    NavigableMap<Long, ExternalSpillableMap<String, 
HoodieRecordGlobalLocation>> subMap = caches.tailMap(checkpointId, false);
     // Close all the ExternalSpillableMap instances before removing them
     subMap.values().forEach(ExternalSpillableMap::close);
     // Remove all the entries from the main cache
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RecordLevelIndexBackend.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RecordLevelIndexBackend.java
index bcd0dcc4b7b1..68200bf3ea13 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RecordLevelIndexBackend.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RecordLevelIndexBackend.java
@@ -29,6 +29,7 @@ import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.sink.event.Correspondent;
 import org.apache.hudi.util.StreamerUtil;
 
 import lombok.Getter;
@@ -105,8 +106,9 @@ public class RecordLevelIndexBackend implements 
MinibatchIndexBackend {
   }
 
   @Override
-  public void onCommitSuccess(long checkpointId) {
-    recordIndexCache.clean(checkpointId);
+  public void onCheckpointComplete(Correspondent correspondent) {
+    Map<Long, String> inflightInstants = 
correspondent.requestInflightInstants();
+    
recordIndexCache.clean(inflightInstants.keySet().stream().min(Long::compareTo).orElse(Long.MAX_VALUE));
     this.metaClient.reloadActiveTimeline();
     reloadMetadataTable();
   }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/EventBuffers.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/EventBuffers.java
index d4eb6a393883..889326126e10 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/EventBuffers.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/EventBuffers.java
@@ -28,6 +28,7 @@ import org.apache.flink.configuration.Configuration;
 
 import java.io.Serializable;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentSkipListMap;
@@ -124,6 +125,12 @@ public class EventBuffers implements Serializable {
     return this.eventBuffers.entrySet().stream();
   }
 
+  public HashMap<Long, String> getAllCheckpointIdAndInstants() {
+    HashMap<Long, String> result = new HashMap<>(eventBuffers.size());
+    this.eventBuffers.forEach((k, v) -> result.put(k, v.getLeft()));
+    return result;
+  }
+
   public void initNewEventBuffer(long checkpointId, String instantTime, int 
parallelism) {
     this.eventBuffers.put(checkpointId, Pair.of(instantTime, new 
WriteMetadataEvent[parallelism]));
   }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/OperatorIDGenerator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/OperatorIDGenerator.java
new file mode 100644
index 000000000000..a0ad18bdc68a
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/OperatorIDGenerator.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.utils;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.shaded.curator5.com.google.common.hash.Hashing;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+/**
+ * Generating {@link OperatorID} for communication between Flink operators.
+ *
+ * <p>Operator ID generation is an internal implementation inside Flink, 
happening during the
+ * stream graph generating phase. The implementation here is the same as that 
of Flink to make
+ * sure that operators can reach out to each other on the cluster.
+ *
+ * @see org.apache.flink.state.api.runtime.OperatorIDGenerator
+ */
+public class OperatorIDGenerator {
+  private OperatorIDGenerator() {
+  }
+
+  /**
+   * Generate {@link OperatorID}'s from {@code uid}'s.
+   *
+   * @param uid {@code DataStream} operator uid.
+   * @return corresponding {@link OperatorID}
+   */
+  public static OperatorID fromUid(String uid) {
+    byte[] hash = Hashing.murmur3_128(0).newHasher().putString(uid, 
UTF_8).hash().asBytes();
+    return new OperatorID(hash);
+  }
+}
\ No newline at end of file
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
index 5fbd3609abf6..ae0dce254222 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
@@ -384,7 +384,11 @@ public class Pipelines {
           throw new HoodieNotSupportedException("Unknown bucket index engine 
type: " + bucketIndexEngineType);
       }
     } else {
-      DataStream<HoodieFlinkInternalRow> bucketAssignStream = 
createBucketAssignStream(dataStream, conf, rowType);
+      // uuid is used to generate operator id for the write operator, then the 
bucket assign operator can send
+      // operator event to the coordinator of the write operator based on the 
operator id.
+      // @see org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway.
+      String writeOperatorUid = opUID("stream_write", conf);
+      DataStream<HoodieFlinkInternalRow> bucketAssignStream = 
createBucketAssignStream(dataStream, conf, rowType, writeOperatorUid);
       return bucketAssignStream
           // shuffle by fileId(bucket id)
           .keyBy(HoodieFlinkInternalRow::getFileId)
@@ -392,7 +396,7 @@ public class Pipelines {
               opName("stream_write", conf),
               TypeInformation.of(RowData.class),
               StreamWriteOperator.getFactory(conf, rowType))
-          .uid(opUID("stream_write", conf))
+          .uid(writeOperatorUid)
           .setParallelism(conf.get(FlinkOptions.WRITE_TASKS));
     }
   }
@@ -406,7 +410,7 @@ public class Pipelines {
    * @return A DataStream of HoodieFlinkInternalRow records with bucket 
assignments
    */
   private static DataStream<HoodieFlinkInternalRow> createBucketAssignStream(
-      DataStream<HoodieFlinkInternalRow> inputStream, Configuration conf, 
RowType rowType) {
+      DataStream<HoodieFlinkInternalRow> inputStream, Configuration conf, 
RowType rowType, String writeOperatorUid) {
     String assignerOperatorName = "bucket_assigner";
     if (OptionsResolver.isMiniBatchBucketAssign(conf)) {
       return inputStream
@@ -414,7 +418,7 @@ public class Pipelines {
           .transform(
               assignerOperatorName,
               new HoodieFlinkInternalRowTypeInfo(rowType),
-              new MiniBatchBucketAssignOperator(new 
MinibatchBucketAssignFunction(conf)))
+              new MiniBatchBucketAssignOperator(new 
MinibatchBucketAssignFunction(conf), 
OperatorIDGenerator.fromUid(writeOperatorUid)))
           .uid(opUID(assignerOperatorName, conf))
           .setParallelism(conf.get(FlinkOptions.BUCKET_ASSIGN_TASKS));
     } else {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
index 8a79c0ddbf11..9645e41d05d7 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
@@ -48,6 +48,7 @@ import org.apache.hudi.utils.TestUtils;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
 import 
org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
 import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
@@ -64,6 +65,7 @@ import org.slf4j.Logger;
 import java.io.File;
 import java.io.IOException;
 import java.util.Collections;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
@@ -561,6 +563,45 @@ public class TestStreamWriteOperatorCoordinator {
     }
   }
 
+  @Test
+  void testHandleInFlightInstantsRequest() throws Exception {
+    Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+    conf.set(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ.name());
+    coordinator = createCoordinator(conf, 2);
+
+    // Request an instant time to create an initial instant
+    String instant1 = requestInstantTime(1);
+    assertThat(instant1, not(is("")));
+
+    // Add some events to the buffer to simulate ongoing processing
+    OperatorEvent event1 = createOperatorEvent(0, 1, instant1, "par1", false, 
false, 0.1);
+    coordinator.handleEventFromOperator(0, event1);
+
+    // Request another instant time to create a second instant
+    String instant2 = requestInstantTime(2);
+    assertThat(instant2, not(is("")));
+
+    // Add more events for the second instant
+    OperatorEvent event2 = createOperatorEvent(1, 2, instant2, "par2", false, 
false, 0.2);
+    coordinator.handleEventFromOperator(1, event2);
+
+    // Call handleCoordinationRequest with InflightInstantsRequest
+    CompletableFuture<CoordinationResponse> responseFuture =
+        
coordinator.handleCoordinationRequest(Correspondent.InflightInstantsRequest.getInstance());
+
+    // Unwrap and verify the response
+    Correspondent.InflightInstantsResponse response =
+        CoordinationResponseSerDe.unwrap(responseFuture.get());
+
+    // Check that the response contains the expected checkpoint IDs and 
instant times
+    Map<Long, String> inflightInstants = response.getInflightInstants();
+    assertEquals(2, inflightInstants.size());
+    assertTrue(inflightInstants.containsKey(1L));
+    assertTrue(inflightInstants.containsKey(2L));
+    assertEquals(instant1, inflightInstants.get(1L));
+    assertEquals(instant2, inflightInstants.get(2L));
+  }
+
   // -------------------------------------------------------------------------
   //  Utilities
   // -------------------------------------------------------------------------
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
index 6d89f521e995..e7e1f5b523c8 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
@@ -864,4 +864,23 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
         .checkWrittenData(EXPECTED1)
         .end();
   }
+
+  @Test
+  public void testCacheCleanOfRecordIndexBackend() throws Exception {
+    // use record level index
+    conf.set(FlinkOptions.INDEX_TYPE, GLOBAL_RECORD_LEVEL_INDEX.name());
+    
conf.setString(HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_ENABLE_PROP.key(),
 "true");
+    preparePipeline(conf)
+        // should be initialized with 1 inflight caches
+        .assertInflightCachesOfBucketAssigner(1)
+        .consume(TestData.DATA_SET_INSERT)
+        .checkpoint(1)
+        // new cache created since now checkpoint id is updated to 1
+        .assertInflightCachesOfBucketAssigner(2)
+        .assertNextEvent(4, "par1,par2,par3,par4")
+        .checkpointComplete(1)
+        // clean the first inflight cache, left the latest inflight cache.
+        .assertInflightCachesOfBucketAssigner(1)
+        .checkWrittenData(EXPECTED1);
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
index ff00ae50220d..d5056991469c 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
@@ -258,6 +258,11 @@ public class TestWriteMergeOnRead extends 
TestWriteCopyOnWrite {
     // can be re-enabled after #17701
   }
 
+  @Test
+  public void testCacheCleanOfRecordIndexBackend() throws Exception {
+    // can be re-enabled after #17701
+  }
+
   @Override
   protected Map<String, String> getExpectedBeforeCheckpointComplete() {
     return EXPECTED1;
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
index 63d087c5b28c..d5ec424f28c3 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
@@ -359,6 +359,11 @@ public class TestWriteMergeOnReadWithCompact extends 
TestWriteCopyOnWrite {
     // can be re-enabled after #17701
   }
 
+  @Test
+  public void testCacheCleanOfRecordIndexBackend() throws Exception {
+    // can be re-enabled after #17701
+  }
+
   @Override
   protected HoodieTableType getTableType() {
     return HoodieTableType.MERGE_ON_READ;
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestMinibatchBucketAssignFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestMinibatchBucketAssignFunction.java
index 06f8491165e2..6eb7fb16533f 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestMinibatchBucketAssignFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestMinibatchBucketAssignFunction.java
@@ -26,6 +26,7 @@ import org.apache.hudi.utils.TestConfigurations;
 import org.apache.hudi.utils.TestData;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.table.data.StringData;
@@ -58,8 +59,6 @@ public class TestMinibatchBucketAssignFunction {
     conf = TestConfigurations.getDefaultConf(basePath);
     
conf.setString(HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_ENABLE_PROP.key(),
 "true");
     conf.set(FlinkOptions.INDEX_TYPE, 
HoodieIndex.IndexType.GLOBAL_RECORD_LEVEL_INDEX.name());
-    // Set minibatch size to 3 for testing
-    conf.set(FlinkOptions.INDEX_RLI_LOOKUP_MINIBATCH_SIZE, 3);
     TestData.writeData(TestData.DATA_SET_INSERT, conf);
   }
 
@@ -68,10 +67,19 @@ public class TestMinibatchBucketAssignFunction {
     // Create the MinibatchBucketAssignFunction
     MinibatchBucketAssignFunction function = new 
MinibatchBucketAssignFunction(conf);
     // Set up test harness
-    testHarness = new OneInputStreamOperatorTestHarness<>(new 
MiniBatchBucketAssignOperator(function), 1, 1, 0);
+    testHarness = new OneInputStreamOperatorTestHarness<>(new 
MiniBatchBucketAssignOperator(function, new OperatorID()), 1, 1, 0);
     testHarness.open();
   }
 
+  @Test
+  public void testMinibatchSize() {
+    Configuration config = Configuration.fromMap(conf.toMap());
+    config.set(FlinkOptions.INDEX_RLI_LOOKUP_MINIBATCH_SIZE, 200);
+    MinibatchBucketAssignFunction function = new 
MinibatchBucketAssignFunction(config);
+    // although the minibatch size is set to 200, but the final value is 1000 
since the minimum allowed minibatch size is 1000.
+    assertEquals(FlinkOptions.INDEX_RLI_LOOKUP_MINIBATCH_SIZE.defaultValue(), 
function.getMiniBatchSize());
+  }
+
   @Test
   public void testProcessElementWithBufferData() throws Exception {
     // Create test records
@@ -79,8 +87,6 @@ public class TestMinibatchBucketAssignFunction {
         insertRow(StringData.fromString("id1"), 
StringData.fromString("Danny"), 23, TimestampData.fromEpochMillis(1), 
StringData.fromString("par1")));
     HoodieFlinkInternalRow record2 = new HoodieFlinkInternalRow("id2", "par1", 
"I",
         insertRow(StringData.fromString("id2"), 
StringData.fromString("Stephen"), 33, TimestampData.fromEpochMillis(2), 
StringData.fromString("par1")));
-    HoodieFlinkInternalRow record3 = new HoodieFlinkInternalRow("id10", 
"par5", "I",
-        insertRow(StringData.fromString("id10"), 
StringData.fromString("Julian"), 53, TimestampData.fromEpochMillis(3), 
StringData.fromString("par5")));
 
     // Process first two records - they should be buffered
     testHarness.processElement(new StreamRecord<>(record1));
@@ -90,18 +96,23 @@ public class TestMinibatchBucketAssignFunction {
     List<HoodieFlinkInternalRow> output = testHarness.extractOutputValues();
     assertEquals(0, output.size(), "Records should be buffered until batch 
size is reached");
 
-    // Process third record - this should trigger processing of the first two 
buffered records
-    testHarness.processElement(new StreamRecord<>(record3));
+    for (int i = 0; i < 1000; i++) {
+      // Process third record - this should trigger processing of the first 
two buffered records
+      String recordKey = "new_key_" + i;
+      HoodieFlinkInternalRow record = new HoodieFlinkInternalRow(recordKey, 
"par5", "I",
+          insertRow(StringData.fromString(recordKey), 
StringData.fromString("Julian"), 53, TimestampData.fromEpochMillis(3), 
StringData.fromString("par5")));
+      testHarness.processElement(new StreamRecord<>(record));
+    }
 
     // Now we should have processed records
     output = testHarness.extractOutputValues();
-    assertEquals(3, output.size(), "All three records should be processed");
+    assertEquals(1000, output.size(), "All three records should be processed");
 
     // Verify that the records have proper bucket assignments
     for (HoodieFlinkInternalRow row : output) {
       // Check that file ID and instant time are assigned
       assertTrue(row.getFileId() != null && !row.getFileId().isEmpty(), "File 
ID should be assigned");
-      if (row.getRecordKey().equals("id10")) {
+      if (row.getRecordKey().startsWith("new_key")) {
         assertEquals("I", row.getInstantTime(), "the record is an insert 
record");
       } else {
         assertEquals("U", row.getInstantTime(), "the record is an update 
record");
@@ -111,31 +122,40 @@ public class TestMinibatchBucketAssignFunction {
 
   @Test
   public void testProcessElementWithIndexRecords() throws Exception {
-    // Create an index record (one that has isIndexRecord() returning true)
-    HoodieFlinkInternalRow indexRecord = new HoodieFlinkInternalRow("id1", 
"par1", "file_id1", "000");
-    // For now, let's test with a regular record
-    HoodieFlinkInternalRow record1 = new HoodieFlinkInternalRow("id1", "par1", 
"I",
-        insertRow(StringData.fromString("id1"), 
StringData.fromString("Danny"), 23, TimestampData.fromEpochMillis(1), 
StringData.fromString("par1")));
-    HoodieFlinkInternalRow record2 = new HoodieFlinkInternalRow("id2", "par1", 
"I",
-        insertRow(StringData.fromString("id2"), 
StringData.fromString("Stephen"), 33, TimestampData.fromEpochMillis(2), 
StringData.fromString("par1")));
-    HoodieFlinkInternalRow record3 = new HoodieFlinkInternalRow("id3", "par2", 
"I",
-        insertRow(StringData.fromString("id3"), 
StringData.fromString("Julian"), 53, TimestampData.fromEpochMillis(3), 
StringData.fromString("par2")));
-    
-    testHarness.processElement(new StreamRecord<>(indexRecord));
+    // insert 500 index records
+    for (int i = 0; i < 500; i++) {
+      // Process third record - this should trigger processing of the first 
two buffered records
+      String recordKey = "index_new_key_" + i;
+      // Create an index record (one that has isIndexRecord() returning true)
+      HoodieFlinkInternalRow indexRecord = new 
HoodieFlinkInternalRow(recordKey, "par1", "file_id1", "000");
+      testHarness.processElement(new StreamRecord<>(indexRecord));
+    }
 
-    testHarness.processElement(new StreamRecord<>(record1));
-    testHarness.processElement(new StreamRecord<>(record2));
+    // insert 500 regular records
+    for (int i = 0; i < 500; i++) {
+      // Process third record - this should trigger processing of the first 
two buffered records
+      String recordKey = "new_key_" + i;
+      HoodieFlinkInternalRow record = new HoodieFlinkInternalRow(recordKey, 
"par5", "I",
+          insertRow(StringData.fromString(recordKey), 
StringData.fromString("Julian"), 53, TimestampData.fromEpochMillis(3), 
StringData.fromString("par5")));
+      testHarness.processElement(new StreamRecord<>(record));
+    }
 
     // index records will not be buffered, so buffer will not be flushing
     List<HoodieFlinkInternalRow> output = testHarness.extractOutputValues();
     assertEquals(0, output.size(), "Record should be buffered since batch size 
is 2");
-    
-    // Add another record to trigger buffer processing
-    testHarness.processElement(new StreamRecord<>(record3));
+
+    // insert another 500 regular records
+    for (int i = 0; i < 500; i++) {
+      // Process third record - this should trigger processing of the first 
two buffered records
+      String recordKey = "new_key_" + i;
+      HoodieFlinkInternalRow record = new HoodieFlinkInternalRow(recordKey, 
"par5", "I",
+          insertRow(StringData.fromString(recordKey), 
StringData.fromString("Julian"), 53, TimestampData.fromEpochMillis(3), 
StringData.fromString("par5")));
+      testHarness.processElement(new StreamRecord<>(record));
+    }
 
     // the expected size is 3, without index record
     output = testHarness.extractOutputValues();
-    assertEquals(3, output.size(), "Both records should be processed");
+    assertEquals(1000, output.size(), "Both records should be processed");
   }
 
   @Test
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestRecordLevelIndexBackend.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestRecordLevelIndexBackend.java
index 0b10bc4f4dc9..49d4370266e8 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestRecordLevelIndexBackend.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestRecordLevelIndexBackend.java
@@ -21,6 +21,7 @@ package org.apache.hudi.sink.partitioner.index;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
 import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.sink.event.Correspondent;
 import org.apache.hudi.util.StreamerUtil;
 import org.apache.hudi.utils.TestConfigurations;
 import org.apache.hudi.utils.TestData;
@@ -35,6 +36,7 @@ import org.junit.jupiter.api.io.TempDir;
 import java.io.File;
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.Map;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -42,6 +44,8 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 
 import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 /**
  * Test cases for {@link RecordLevelIndexBackend}.
@@ -98,7 +102,11 @@ public class TestRecordLevelIndexBackend {
       assertEquals(newLocation, location);
 
       // previous instant commit success, clean
-      recordLevelIndexBackend.onCommitSuccess(1);
+      Correspondent correspondent = mock(Correspondent.class);
+      Map<Long, String> inflightInstants = new HashMap<>();
+      inflightInstants.put(1L, "0001");
+      
when(correspondent.requestInflightInstants()).thenReturn(inflightInstants);
+      recordLevelIndexBackend.onCheckpointComplete(correspondent);
       assertEquals(1, 
recordLevelIndexBackend.getRecordIndexCache().getCaches().size());
       // the cache will only contain 'new_key', others are cleaned.
       location = recordLevelIndexBackend.getRecordIndexCache().get("new_key");
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockCorrespondent.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockCorrespondent.java
index 6669ed9f5ace..7d4bb95416cc 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockCorrespondent.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockCorrespondent.java
@@ -22,6 +22,8 @@ import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
 import org.apache.hudi.sink.event.Correspondent;
 
+import java.util.Map;
+
 /**
  * A mock {@link Correspondent} that always return the latest instant.
  */
@@ -40,4 +42,14 @@ public class MockCorrespondent extends Correspondent {
       throw new HoodieException("Error requesting the instant time from the 
coordinator", e);
     }
   }
+
+  @Override
+  public Map<Long, String> requestInflightInstants() {
+    try {
+      InflightInstantsResponse response = 
CoordinationResponseSerDe.unwrap(this.coordinator.handleCoordinationRequest(InflightInstantsRequest.getInstance()).get());
+      return response.getInflightInstants();
+    } catch (Exception e) {
+      throw new HoodieException("Error requesting the inflight instants from 
the coordinator", e);
+    }
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
index bd4c5f7153b0..5ed0bc961de7 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
@@ -83,6 +83,7 @@ public class StreamWriteFunctionWrapper<I> implements 
TestFunctionWrapper<I> {
   private final MockOperatorCoordinatorContext coordinatorContext;
   @Getter
   private StreamWriteOperatorCoordinator coordinator;
+  private MockCorrespondent correspondent;
   private final MockStateInitializationContext stateInitializationContext;
   private final TreeMap<Long, byte[]> coordinatorStateStore;
   private final KeyedProcessFunction.Context context;
@@ -134,6 +135,7 @@ public class StreamWriteFunctionWrapper<I> implements 
TestFunctionWrapper<I> {
     // one function
     this.coordinatorContext = new MockOperatorCoordinatorContext(new 
OperatorID(), 1);
     this.coordinator = new StreamWriteOperatorCoordinator(conf, 
this.coordinatorContext);
+    this.correspondent = new MockCorrespondent(coordinator);
     this.bucketAssignFunctionContext = new MockBucketAssignFunctionContext();
     this.stateInitializationContext = new MockStateInitializationContext();
     this.coordinatorStateStore = new TreeMap<>();
@@ -158,6 +160,7 @@ public class StreamWriteFunctionWrapper<I> implements 
TestFunctionWrapper<I> {
 
     bucketAssignerFunction = new BucketAssignFunction(conf);
     bucketAssignerFunction.setRuntimeContext(runtimeContext);
+    bucketAssignerFunction.setCorrespondent(correspondent);
     bucketAssignerFunction.open(conf);
     bucketAssignerFunction.initializeState(this.stateInitializationContext);
 
@@ -285,6 +288,7 @@ public class StreamWriteFunctionWrapper<I> implements 
TestFunctionWrapper<I> {
     resetCoordinatorToCheckpoint();
     this.coordinator.start();
     this.coordinator.setExecutor(new 
MockCoordinatorExecutor(coordinatorContext));
+    this.correspondent = new MockCorrespondent(coordinator);
   }
 
   public void checkpointFails(long checkpointId) {
@@ -313,6 +317,11 @@ public class StreamWriteFunctionWrapper<I> implements 
TestFunctionWrapper<I> {
     return this.writeFunction;
   }
 
+  @Override
+  public BucketAssignFunction getBucketAssignFunction() {
+    return this.bucketAssignerFunction;
+  }
+
   public boolean isKeyInState(HoodieKey hoodieKey) {
     return 
this.bucketAssignFunctionContext.isKeyInState(hoodieKey.getRecordKey());
   }
@@ -331,7 +340,7 @@ public class StreamWriteFunctionWrapper<I> implements 
TestFunctionWrapper<I> {
     writeFunction.setOperatorEventGateway(gateway);
     writeFunction.initializeState(this.stateInitializationContext);
     writeFunction.open(conf);
-    writeFunction.setCorrespondent(new MockCorrespondent(this.coordinator));
+    writeFunction.setCorrespondent(correspondent);
   }
 
   // -------------------------------------------------------------------------
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestFunctionWrapper.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestFunctionWrapper.java
index b380879b3836..fdb173bc2fd5 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestFunctionWrapper.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestFunctionWrapper.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
 import org.apache.hudi.sink.common.AbstractWriteFunction;
 import org.apache.hudi.sink.event.WriteMetadataEvent;
+import org.apache.hudi.sink.partitioner.BucketAssignFunction;
 
 import 
org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
@@ -121,6 +122,13 @@ public interface TestFunctionWrapper<I> {
    */
   AbstractWriteFunction getWriteFunction();
 
+  /**
+   * Returns the bucket assigner function
+   */
+  default BucketAssignFunction getBucketAssignFunction() {
+    throw new UnsupportedOperationException();
+  }
+
   /**
    * Returns the data buffer of the write task.
    */
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
index b59d32ebdbc0..b9e4847ca6ee 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
@@ -35,6 +35,8 @@ import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.sink.event.CommitAckEvent;
 import org.apache.hudi.sink.event.WriteMetadataEvent;
+import org.apache.hudi.sink.partitioner.index.IndexBackend;
+import org.apache.hudi.sink.partitioner.index.RecordLevelIndexBackend;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.util.StreamerUtil;
 import org.apache.hudi.utils.TestConfigurations;
@@ -313,6 +315,17 @@ public class TestWriteBase {
       return this;
     }
 
+    /**
+     * Asserts the num of inflight caches record index cache in bucket 
assigner.
+     */
+    public TestHarness assertInflightCachesOfBucketAssigner(int expected) {
+      IndexBackend indexBackend = 
pipeline.getBucketAssignFunction().getIndexBackend();
+      if (indexBackend instanceof RecordLevelIndexBackend) {
+        assertEquals(expected, ((RecordLevelIndexBackend) 
indexBackend).getRecordIndexCache().getCaches().size());
+      }
+      return this;
+    }
+
     /**
      * Checkpoints the pipeline, which triggers the data write and event send.
      */
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 17bd2d77faa1..60a55a74f1a5 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -94,6 +94,7 @@ import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
 import static java.util.Arrays.asList;
+import static java.util.Arrays.stream;
 import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE;
 import static org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ;
 import static org.apache.hudi.utils.TestConfigurations.catalog;
@@ -2903,14 +2904,12 @@ public class ITTestHoodieDataSource {
             + "+I[id3, id3, Julian, 43, 1970-01-01T00:00:03, par1, par1]]");
   }
 
-  @ParameterizedTest
-  @ValueSource(ints = {-1, 100})
-  void testMiniBatchBucketAssign(int bucketAssignMinibatchSize) throws 
Exception {
-    TableEnvironment tableEnv = batchTableEnv;
+  @Test
+  void testMiniBatchBucketAssign() throws Exception {
+    TableEnvironment tableEnv = streamTableEnv;
     String hoodieTableDDL = sql("t1")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
         .option(FlinkOptions.INDEX_TYPE, 
HoodieIndex.IndexType.GLOBAL_RECORD_LEVEL_INDEX.name())
-        .option(FlinkOptions.INDEX_RLI_LOOKUP_MINIBATCH_SIZE, 
bucketAssignMinibatchSize)
         .option(FlinkOptions.READ_DATA_SKIPPING_ENABLED, true)
         .option(FlinkOptions.TABLE_TYPE, COPY_ON_WRITE)
         .end();

Reply via email to