This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new c2b401ed70ff feat(flink): Support bootstrap from RLI to local RocksDB 
for flink bucket assigner (#18254)
c2b401ed70ff is described below

commit c2b401ed70ff63ab26ceb0d776f29884ece781db
Author: Shuo Cheng <[email protected]>
AuthorDate: Tue Mar 24 16:18:37 2026 +0800

    feat(flink): Support bootstrap from RLI to local RocksDB for flink bucket 
assigner (#18254)
    
    * Support scan all record index locations
    * Support RocksDb index backend.
    * Add a bootstrap operator for bootstraping RLI into RocksDB index backend.
    * Blocking rli bootstrap on job restart/failover until all pending instants 
being committed successfully.
---
 .../run/strategy/JavaExecutionStrategy.java        |   2 +-
 .../hudi/client/TestJavaHoodieBackedMetadata.java  |  56 +++++
 .../MultipleSparkJobExecutionStrategy.java         |   2 +-
 ...SparkJobConsistentHashingExecutionStrategy.java |   2 +-
 .../hudi/common/data/HoodieListPairData.java       |   8 +
 .../apache/hudi/common/data/HoodiePairData.java    |  11 +
 .../hudi/metadata/HoodieBackedTableMetadata.java   |  67 ++++++
 .../apache/hudi/metadata/HoodieTableMetadata.java  |  13 ++
 .../hudi/util}/LazyConcatenatingIterator.java      |   4 +-
 .../common/data/TestHoodieListDataPairData.java    |  18 ++
 .../TestHoodieBackedTableMetadataDataCleanup.java  |   2 +-
 .../hudi/util}/TestLazyConcatenatingIterator.java  |   3 +-
 .../apache/hudi/configuration/FlinkOptions.java    |  18 ++
 .../apache/hudi/configuration/OptionsResolver.java |   4 +
 .../hudi/sink/StreamWriteOperatorCoordinator.java  |  13 ++
 .../sink/bootstrap/AbstractBootstrapOperator.java  |  83 ++++++++
 .../hudi/sink/bootstrap/BootstrapOperator.java     |  50 +----
 .../hudi/sink/bootstrap/RLIBootstrapOperator.java  | 232 +++++++++++++++++++++
 .../org/apache/hudi/sink/event/Correspondent.java  |  38 ++++
 .../partitioner/index/IndexBackendFactory.java     |  30 +--
 .../sink/partitioner/index/IndexWriteFunction.java |   1 +
 .../partitioner/index/RocksDBIndexBackend.java     |  53 +++++
 .../org/apache/hudi/sink/utils/CommitGuard.java    |  25 +++
 .../org/apache/hudi/sink/utils/EventBuffers.java   |  22 +-
 .../java/org/apache/hudi/sink/utils/Pipelines.java |  20 +-
 .../org/apache/hudi/table/HoodieTableFactory.java  |   4 +
 .../org/apache/hudi/util/FlinkWriteClients.java    |   4 +
 .../sink/TestStreamWriteOperatorCoordinator.java   |  28 +++
 .../partitioner/index/TestRocksDBIndexBackend.java |  53 +++++
 .../apache/hudi/sink/utils/TestCommitGuard.java    |  78 +++++++
 .../apache/hudi/sink/utils/TestEventBuffers.java   | 121 +++++++++++
 .../apache/hudi/table/ITTestHoodieDataSource.java  |  26 ++-
 .../apache/hudi/utils/TestFlinkWriteClients.java   |  17 ++
 33 files changed, 1032 insertions(+), 76 deletions(-)

diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
index f0a0cf1c47df..279bda6358fc 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
@@ -23,7 +23,7 @@ import org.apache.hudi.avro.model.HoodieClusteringGroup;
 import org.apache.hudi.avro.model.HoodieClusteringPlan;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.JavaTaskContextSupplier;
-import org.apache.hudi.client.utils.LazyConcatenatingIterator;
+import org.apache.hudi.util.LazyConcatenatingIterator;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.data.HoodieListData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
diff --git 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
index e1917cc7cc35..9563439c39eb 100644
--- 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
+++ 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
@@ -170,6 +170,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 
@@ -1292,6 +1293,61 @@ public class TestJavaHoodieBackedMetadata extends 
TestHoodieMetadataBase {
     }
   }
 
+  @Test
+  public void testReadRecordIndexLocationsByBucketId() throws Exception {
+    init(HoodieTableType.COPY_ON_WRITE);
+    HoodieEngineContext engineContext = new 
HoodieJavaEngineContext(storageConf);
+
+    HoodieWriteConfig writeConfig = getWriteConfigBuilder(true, true, false)
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+            .enable(true)
+            .withEnableGlobalRecordLevelIndex(true)
+            .withRecordIndexFileGroupCount(3, 3)
+            .build())
+        .build();
+
+    try (HoodieJavaWriteClient client = new 
HoodieJavaWriteClient(engineContext, writeConfig)) {
+      String instantTime = client.startCommit();
+      List<HoodieRecord> records = dataGen.generateInserts(instantTime, 120);
+      List<WriteStatus> writeStatuses = client.insert(records, instantTime);
+      client.commit(instantTime, writeStatuses);
+      assertNoWriteErrors(writeStatuses);
+
+      HoodieBackedTableMetadata metadataReader = (HoodieBackedTableMetadata) 
metadata(client);
+      int bucketCount = 
metadataReader.getNumFileGroupsForPartition(MetadataPartitionType.RECORD_INDEX);
+      assertEquals(3, bucketCount);
+
+      long totalRecords = metadataReader.readRecordIndexLocations(fileSlices 
-> fileSlices).count();
+      assertEquals(records.size(), totalRecords);
+    }
+  }
+
+  @Test
+  public void 
testReadRecordIndexLocationsByBucketIdFailsWhenRecordIndexDisabled() throws 
Exception {
+    init(HoodieTableType.COPY_ON_WRITE);
+    HoodieEngineContext engineContext = new 
HoodieJavaEngineContext(storageConf);
+
+    HoodieWriteConfig writeConfig = getWriteConfigBuilder(true, true, false)
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+            .enable(true)
+            .withEnableGlobalRecordLevelIndex(false)
+            .build())
+        .build();
+
+    try (HoodieJavaWriteClient client = new 
HoodieJavaWriteClient(engineContext, writeConfig)) {
+      String instantTime = client.startCommit();
+      List<HoodieRecord> records = dataGen.generateInserts(instantTime, 20);
+      List<WriteStatus> writeStatuses = client.insert(records, instantTime);
+      client.commit(instantTime, writeStatuses);
+      assertNoWriteErrors(writeStatuses);
+
+      HoodieBackedTableMetadata metadataReader = (HoodieBackedTableMetadata) 
metadata(client);
+      IllegalStateException exception =
+          assertThrows(IllegalStateException.class, () -> 
metadataReader.readRecordIndexLocations(fileSlices -> fileSlices));
+      assertTrue(exception.getMessage().contains("Record index is not 
initialized in MDT"));
+    }
+  }
+
   // Some operations are not feasible with test table infra. hence using write 
client to test those cases.
 
   /**
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
index 9976793199d8..c29fc9b926d9 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
@@ -25,7 +25,7 @@ import org.apache.hudi.avro.model.HoodieClusteringPlan;
 import org.apache.hudi.client.SparkTaskContextSupplier;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
-import org.apache.hudi.client.utils.LazyConcatenatingIterator;
+import org.apache.hudi.util.LazyConcatenatingIterator;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobConsistentHashingExecutionStrategy.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobConsistentHashingExecutionStrategy.java
index 960107d06547..7b49e7972f47 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobConsistentHashingExecutionStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobConsistentHashingExecutionStrategy.java
@@ -20,7 +20,7 @@ package org.apache.hudi.client.clustering.run.strategy;
 
 import org.apache.hudi.client.SparkTaskContextSupplier;
 import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.client.utils.LazyConcatenatingIterator;
+import org.apache.hudi.util.LazyConcatenatingIterator;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.engine.ReaderContextFactory;
 import org.apache.hudi.common.engine.TaskContextSupplier;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListPairData.java 
b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListPairData.java
index 66e39d28b38a..1ade14e3e107 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListPairData.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListPairData.java
@@ -36,6 +36,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Spliterator;
 import java.util.Spliterators;
+import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.stream.Collector;
 import java.util.stream.Collectors;
@@ -85,6 +86,13 @@ public class HoodieListPairData<K, V> extends 
HoodieBaseListData<Pair<K, V>> imp
     return collectAsList();
   }
 
+  @Override
+  public void forEach(Consumer<Pair<K, V>> consumer) {
+    try (Stream<Pair<K, V>> stream = asStream()) {
+      stream.sequential().forEach(consumer);
+    }
+  }
+
   @Override
   public void persist(String cacheConfig) {
     // no-op
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodiePairData.java 
b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodiePairData.java
index bc8e309dd728..a6e79f0b2030 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodiePairData.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodiePairData.java
@@ -30,6 +30,7 @@ import java.io.Serializable;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Consumer;
 
 /**
  * An abstraction for pairs of key in type K and value in type V to store the 
reference
@@ -169,6 +170,16 @@ public interface HoodiePairData<K, V> extends Serializable 
{
    */
   List<Pair<K, V>> collectAsList();
 
+  /**
+   * Applies the given action to each pair in this dataset.
+   *
+   * <p>Implementations may execute this action in a streaming manner and 
avoid materializing
+   * the full dataset into memory.
+   */
+  default void forEach(Consumer<Pair<K, V>> consumer) {
+    collectAsList().forEach(consumer);
+  }
+
   /**
    * @return the deduce number of shuffle partitions
    */
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index d4d50c52fafb..304ea39ffe23 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -72,6 +72,7 @@ import org.apache.hudi.io.storage.HoodieIOFactory;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.StoragePathInfo;
+import org.apache.hudi.util.LazyConcatenatingIterator;
 
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
@@ -91,6 +92,7 @@ import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Function;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
@@ -355,6 +357,71 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
     });
   }
 
+  @Override
+  public HoodiePairData<String, HoodieRecordGlobalLocation> 
readRecordIndexLocations(
+      SerializableFunctionUnchecked<List<FileSlice>, List<FileSlice>> 
fileSlicesFilter) {
+    
ValidationUtils.checkState(dataMetaClient.getTableConfig().isMetadataPartitionAvailable(RECORD_INDEX),
+        "Record index is not initialized in MDT");
+
+    return dataCleanupManager.ensureDataCleanupOnException(v -> {
+      // Get all file slices for the record index partition
+      List<FileSlice> fileSlices = partitionFileSliceMap.computeIfAbsent(
+          RECORD_INDEX.getPartitionPath(),
+          k -> HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(
+              metadataMetaClient, getMetadataFileSystemView(), 
RECORD_INDEX.getPartitionPath()));
+
+      List<FileSlice> targetFileSlices = fileSlicesFilter.apply(fileSlices);
+      if (targetFileSlices.isEmpty()) {
+        return HoodieListPairData.eager(Collections.emptyList());
+      }
+
+      List<Supplier<ClosableIterator<HoodieRecord<HoodieMetadataPayload>>>> 
iteratorSuppliers =
+          targetFileSlices.stream().map(targetFileSlice -> {
+            return new 
Supplier<ClosableIterator<HoodieRecord<HoodieMetadataPayload>>>() {
+              @Override
+              public ClosableIterator<HoodieRecord<HoodieMetadataPayload>> 
get() {
+                return scanRecordsItr(
+                    targetFileSlice,
+                    metadataRecord -> {
+                      HoodieMetadataPayload payload = new 
HoodieMetadataPayload(Option.of(metadataRecord));
+                      String rowKey = payload.key != null ? payload.key : 
metadataRecord.get(KEY_FIELD_NAME).toString();
+                      HoodieKey hoodieKey = new HoodieKey(rowKey, 
RECORD_INDEX.getPartitionPath());
+                      return new HoodieAvroRecord<>(hoodieKey, payload);
+                    });
+              }
+            };
+          }).collect(Collectors.toList());
+
+      HoodieData<HoodieRecord<HoodieMetadataPayload>> allRecords =
+          HoodieListData.lazy(new 
LazyConcatenatingIterator<>(iteratorSuppliers)).filter(r -> 
!r.getData().isDeleted());
+      // Map records to key-location pairs
+      return allRecords.mapToPair(record -> {
+        String recordKey = record.getRecordKey();
+        HoodieRecordGlobalLocation location = 
record.getData().getRecordGlobalLocation();
+        return Pair.of(recordKey, location);
+      });
+    });
+  }
+
+  /**
+   * Helper method to read all records from a file slice without key filtering
+   */
+  private ClosableIterator<HoodieRecord<HoodieMetadataPayload>> scanRecordsItr(
+      FileSlice fileSlice,
+      SerializableFunctionUnchecked<GenericRecord, 
HoodieRecord<HoodieMetadataPayload>> transformer) {
+    // Read all records from the file slice without any key filtering
+    // This bypasses the normal predicate building mechanism
+    try {
+      ClosableIterator<IndexedRecord> rawIterator = 
readSliceWithFilter(Predicates.alwaysTrue(), fileSlice);
+      return new CloseableMappingIterator<>(rawIterator, record -> {
+        GenericRecord metadataRecord = (GenericRecord) record;
+        return transformer.apply(metadataRecord);
+      });
+    } catch (IOException e) {
+      throw new HoodieIOException("Error reading all records from metadata 
table file slice", e);
+    }
+  }
+
   /**
    * Reads record keys from record-level index. Deleted records are filtered 
out.
    * <p>
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
index 1da1f77400e6..98bd44914c72 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
@@ -22,6 +22,7 @@ import org.apache.hudi.avro.model.HoodieMetadataColumnStats;
 import org.apache.hudi.common.bloom.BloomFilter;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.data.HoodiePairData;
+import org.apache.hudi.common.function.SerializableFunctionUnchecked;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
@@ -258,6 +259,18 @@ public interface HoodieTableMetadata extends Serializable, 
AutoCloseable {
     return readRecordIndexLocationsWithKeys(recordKeys).values();
   }
 
+  /**
+   * Returns the location of record keys for file slices filtered by the given 
file slice filter.
+   * <p>
+   * If the Metadata Table is not enabled, an exception is thrown to 
distinguish this from the absence of the key.
+   *
+   * @param fileSlicesFilter The file slices filter function
+   * @return Pairs of (record key, location of record) from the filtered 
record index file slices.
+   */
+  default HoodiePairData<String, HoodieRecordGlobalLocation> 
readRecordIndexLocations(SerializableFunctionUnchecked<List<FileSlice>, 
List<FileSlice>> fileSlicesFilter) {
+    throw new 
UnsupportedOperationException("readRecordIndexLocations(fileSlicesFilter) is 
not supported by this implementation");
+  }
+
   /**
    * Returns pairs of (secondary key, location of secondary key) which the 
provided secondary keys maps to.
    * Records that are not found are ignored and won't be part of map object 
that is returned.
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/LazyConcatenatingIterator.java
 b/hudi-common/src/main/java/org/apache/hudi/util/LazyConcatenatingIterator.java
similarity index 96%
rename from 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/LazyConcatenatingIterator.java
rename to 
hudi-common/src/main/java/org/apache/hudi/util/LazyConcatenatingIterator.java
index 048c315f276b..0d3358b9f02e 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/LazyConcatenatingIterator.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/util/LazyConcatenatingIterator.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.client.utils;
+package org.apache.hudi.util;
 
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.ClosableIterator;
@@ -30,7 +30,7 @@ import java.util.function.Supplier;
  * Provides iterator interface over List of iterators. Consumes all records 
from first iterator element
  * before moving to next iterator in the list. That is concatenating elements 
across multiple iterators.
  *
- * <p>Different with {@link ConcatenatingIterator}, the internal iterators are 
instantiated lazily.
+ * <p>Different with {@code ConcatenatingIterator}, the internal iterators are 
instantiated lazily.
  */
 public class LazyConcatenatingIterator<T> implements ClosableIterator<T> {
 
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieListDataPairData.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieListDataPairData.java
index 90e7873a2fef..d14465c4bc65 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieListDataPairData.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieListDataPairData.java
@@ -41,6 +41,7 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.Set;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
 
@@ -308,6 +309,23 @@ public class TestHoodieListDataPairData {
     assertEquals(expected, result, "Join result does not match expected 
output");
   }
 
+  @Test
+  public void testForEachOnUnion() {
+    HoodiePairData<Integer, Integer> left = HoodieListPairData.lazy(
+        IntStream.range(0, 1000).mapToObj(i -> Pair.of(i, 
i)).collect(Collectors.toList()));
+    HoodiePairData<Integer, Integer> right = HoodieListPairData.lazy(
+        IntStream.range(1000, 2000).mapToObj(i -> Pair.of(i, 
i)).collect(Collectors.toList()));
+
+    HoodiePairData<Integer, Integer> unionData = left.union(right);
+    List<Pair<Integer, Integer>> actual = new ArrayList<>();
+    unionData.forEach(actual::add);
+
+    List<Pair<Integer, Integer>> expected = IntStream.range(0, 2000)
+        .mapToObj(i -> Pair.of(i, i))
+        .collect(Collectors.toList());
+    assertEquals(expected, actual);
+  }
+
   @Test
   public void testFilter() {
     // Test filtering by key
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieBackedTableMetadataDataCleanup.java
 
b/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieBackedTableMetadataDataCleanup.java
index d899f68685de..03d013f4c586 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieBackedTableMetadataDataCleanup.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieBackedTableMetadataDataCleanup.java
@@ -193,7 +193,7 @@ public class TestHoodieBackedTableMetadataDataCleanup {
       // Setup mock behavior for V2 path
       when(mockPairData.values()).thenReturn(mockHoodieData);
       when(mockMetadata.readSecondaryIndexDataTableRecordKeysV2(any(), 
anyString())).thenReturn(mockHoodieData);
-      
when(mockMetadata.readRecordIndexLocations(any())).thenReturn(mockHoodieData);
+      
when(mockMetadata.readRecordIndexLocations(any(HoodieData.class))).thenReturn(mockHoodieData);
       
       // Call real method on the mock
       when(mockMetadata.readSecondaryIndexLocations(secondaryKeys, 
partitionName)).thenCallRealMethod();
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestLazyConcatenatingIterator.java
 
b/hudi-common/src/test/java/org/apache/hudi/util/TestLazyConcatenatingIterator.java
similarity index 97%
rename from 
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestLazyConcatenatingIterator.java
rename to 
hudi-common/src/test/java/org/apache/hudi/util/TestLazyConcatenatingIterator.java
index fa1a37d02776..dac89c15e229 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestLazyConcatenatingIterator.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/util/TestLazyConcatenatingIterator.java
@@ -16,9 +16,8 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.utils;
+package org.apache.hudi.util;
 
-import org.apache.hudi.client.utils.LazyConcatenatingIterator;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 
 import org.junit.jupiter.api.Test;
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 93f11df956f9..f2907b06d66b 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -38,6 +38,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.hive.MultiPartKeysValueExtractor;
 import org.apache.hudi.hive.ddl.HiveSyncMode;
 import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.io.util.FileIOUtils;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 import org.apache.hudi.keygen.constant.KeyGeneratorType;
 import org.apache.hudi.sink.buffer.BufferMemoryType;
@@ -274,6 +275,15 @@ public class FlinkOptions extends HoodieConfig {
       .defaultValue(0D)
       .withDescription("Index state ttl in days, default stores the index 
permanently");
 
+  @AdvancedConfig
+  public static final ConfigOption<String> INDEX_BOOTSTRAP_ROCKSDB_PATH = 
ConfigOptions
+      .key("index.bootstrap.rocksdb.path")
+      .stringType()
+      .defaultValue(FileIOUtils.getDefaultSpillableMapBasePath())
+      .withDescription("Local directory path for RocksDB when "
+          + "bootstrap is enabled for record level index type."
+          + "Each task manager creates a unique subdirectory under this 
path.");
+
   @AdvancedConfig
   public static final ConfigOption<Boolean> INDEX_GLOBAL_ENABLED = 
ConfigOptions
       .key("index.global.enabled")
@@ -898,6 +908,14 @@ public class FlinkOptions extends HoodieConfig {
       .defaultValue(ClientIds.INIT_CLIENT_ID)
       .withDescription("Unique identifier used to distinguish different writer 
pipelines for concurrent mode");
 
+  // this is only for internal use
+  @AdvancedConfig
+  public static final ConfigOption<String> WRITE_OPERATOR_UID = ConfigOptions
+      .key("write.operator.uid")
+      .stringType()
+      .noDefaultValue()
+      .withDescription("The write operator uid used as the uid for hudi sink 
transformation.");
+
   // ------------------------------------------------------------------------
   //  Compaction Options
   // ------------------------------------------------------------------------
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
index 57f031bbd85e..398ca329dbb4 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
@@ -205,6 +205,10 @@ public class OptionsResolver {
     return indexType == HoodieIndex.IndexType.GLOBAL_RECORD_LEVEL_INDEX;
   }
 
+  public static boolean isRLIWithBootstrap(Configuration conf) {
+    return isRecordLevelIndex(conf) && 
conf.get(FlinkOptions.INDEX_BOOTSTRAP_ENABLED);
+  }
+
   /**
    * Returns whether it is a MERGE_ON_READ table, and updates by bucket index.
    */
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index 5e0677c586d8..88f48c1863ec 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -390,6 +390,9 @@ public class StreamWriteOperatorCoordinator
     if (request instanceof Correspondent.InflightInstantsRequest) {
       return 
handleInFlightInstantsRequest((Correspondent.InflightInstantsRequest) request);
     }
+    if (request instanceof Correspondent.AwaitPendingInstantsRequest) {
+      return 
handleAwaitPendingInstantsRequest((Correspondent.AwaitPendingInstantsRequest) 
request);
+    }
     throw new HoodieException("Unexpected coordination request type: " + 
request.getClass().getSimpleName());
   }
 
@@ -417,6 +420,16 @@ public class StreamWriteOperatorCoordinator
     return 
CompletableFuture.completedFuture(CoordinationResponseSerDe.wrap(coordinationResponse));
   }
 
+  private CompletableFuture<CoordinationResponse> 
handleAwaitPendingInstantsRequest(Correspondent.AwaitPendingInstantsRequest 
request) {
+    CompletableFuture<CoordinationResponse> response = new 
CompletableFuture<>();
+    instantRequestExecutor.execute(() -> {
+      // wait until receiving any bootstrap event.
+      eventBuffers.awaitPrevInstantsToComplete(request.getCheckpointId());
+      
response.complete(CoordinationResponseSerDe.wrap(Correspondent.AwaitPendingInstantsResponse.getInstance()));
+    }, "await pending instants to complete");
+    return response;
+  }
+
   // -------------------------------------------------------------------------
   //  Utilities
   // -------------------------------------------------------------------------
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/AbstractBootstrapOperator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/AbstractBootstrapOperator.java
new file mode 100644
index 000000000000..6e6e8436d748
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/AbstractBootstrapOperator.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.bootstrap;
+
+import org.apache.hudi.client.model.HoodieFlinkInternalRow;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.sink.bootstrap.aggregate.BootstrapAggFunction;
+import org.apache.hudi.utils.RuntimeContextUtils;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Base operator for bootstrap stages that emit preloaded index records.
+ */
+@Slf4j
+public abstract class AbstractBootstrapOperator
+    extends AbstractStreamOperator<HoodieFlinkInternalRow>
+    implements OneInputStreamOperator<HoodieFlinkInternalRow, 
HoodieFlinkInternalRow> {
+
+  protected final Configuration conf;
+
+  protected AbstractBootstrapOperator(Configuration conf) {
+    this.conf = conf;
+  }
+
+  /**
+   * The modifier of this method is updated to `protected` sink Flink 2.0, 
here we overwrite the method
+   * with `public` modifier to make it compatible considering usage in 
hudi-flink module.
+   */
+  @Override
+  public void setup(StreamTask<?, ?> containingTask, StreamConfig config, 
Output<StreamRecord<HoodieFlinkInternalRow>> output) {
+    super.setup(containingTask, config, output);
+  }
+
+  @Override
+  public void processElement(StreamRecord<HoodieFlinkInternalRow> element) 
throws Exception {
+    output.collect(element);
+  }
+
+  protected void waitForBootstrapReady(int taskID) {
+    GlobalAggregateManager aggregateManager = 
getRuntimeContext().getGlobalAggregateManager();
+    int taskNum = 
RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext());
+    int readyTaskNum = 1;
+    while (taskNum != readyTaskNum) {
+      try {
+        readyTaskNum = aggregateManager.updateGlobalAggregate(
+            BootstrapAggFunction.NAME + conf.get(FlinkOptions.TABLE_NAME),
+            taskID,
+            new BootstrapAggFunction());
+        log.info("Waiting for other bootstrap tasks to complete, taskId = {}, 
ready = {}/{}", taskID, readyTaskNum, taskNum);
+        TimeUnit.SECONDS.sleep(5);
+      } catch (Exception e) {
+        log.error("Updating global task bootstrap summary failed", e);
+      }
+    }
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
index 5cbd581819d0..a3213ea59421 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
@@ -35,7 +35,6 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.HadoopConfigurations;
 import org.apache.hudi.configuration.OptionsResolver;
-import org.apache.hudi.sink.bootstrap.aggregate.BootstrapAggFunction;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.format.FormatUtils;
@@ -54,20 +53,13 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
-import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.table.data.RowData;
 
 import java.io.IOException;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
-import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 
 import static java.util.stream.Collectors.toList;
@@ -84,18 +76,13 @@ import static 
org.apache.hudi.util.StreamerUtil.metadataConfig;
  */
 @Slf4j
 public class BootstrapOperator
-    extends AbstractStreamOperator<HoodieFlinkInternalRow>
-    implements OneInputStreamOperator<HoodieFlinkInternalRow, 
HoodieFlinkInternalRow> {
+    extends AbstractBootstrapOperator {
 
   protected HoodieTable<?, ?, ?, ?> hoodieTable;
 
-  protected final Configuration conf;
-
   protected transient org.apache.hadoop.conf.Configuration hadoopConf;
   protected transient HoodieWriteConfig writeConfig;
 
-  private transient GlobalAggregateManager aggregateManager;
-
   private transient ListState<String> instantState;
   private transient HoodieTableMetaClient metaClient;
   private transient InternalSchemaManager internalSchemaManager;
@@ -104,19 +91,10 @@ public class BootstrapOperator
   private String lastInstantTime;
 
   public BootstrapOperator(Configuration conf) {
-    this.conf = conf;
+    super(conf);
     this.pattern = 
Pattern.compile(conf.get(FlinkOptions.INDEX_PARTITION_REGEX));
   }
 
-  /**
-   * The modifier of this method is updated to `protected` sink Flink 2.0, 
here we overwrite the method
-   * with `public` modifier to make it compatible considering usage in 
hudi-flink module.
-   */
-  @Override
-  public void setup(StreamTask<?, ?> containingTask, StreamConfig config, 
Output<StreamRecord<HoodieFlinkInternalRow>> output) {
-    super.setup(containingTask, config, output);
-  }
-
   @Override
   public void snapshotState(StateSnapshotContext context) throws Exception {
     lastInstantTime = 
StreamerUtil.getLastCompletedInstant(StreamerUtil.createMetaClient(this.conf));
@@ -146,7 +124,6 @@ public class BootstrapOperator
     this.writeConfig = FlinkWriteClients.getHoodieClientConfig(
         this.conf, false, !OptionsResolver.isIncrementalJobGraph(conf));
     this.hoodieTable = FlinkTables.createTable(writeConfig, hadoopConf, 
getRuntimeContext());
-    this.aggregateManager = getRuntimeContext().getGlobalAggregateManager();
     this.metaClient = StreamerUtil.createMetaClient(conf, hadoopConf);
     this.internalSchemaManager = 
InternalSchemaManager.get(hoodieTable.getStorageConf(), metaClient);
 
@@ -173,29 +150,6 @@ public class BootstrapOperator
     hoodieTable = null;
   }
 
-  /**
-   * Wait for other bootstrap tasks to finish the index bootstrap.
-   */
-  private void waitForBootstrapReady(int taskID) {
-    int taskNum = 
RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext());
-    int readyTaskNum = 1;
-    while (taskNum != readyTaskNum) {
-      try {
-        readyTaskNum = 
aggregateManager.updateGlobalAggregate(BootstrapAggFunction.NAME + 
conf.get(FlinkOptions.TABLE_NAME), taskID, new BootstrapAggFunction());
-        log.info("Waiting for other bootstrap tasks to complete, taskId = 
{}.", taskID);
-
-        TimeUnit.SECONDS.sleep(5);
-      } catch (Exception e) {
-        log.error("Updating global task bootstrap summary failed", e);
-      }
-    }
-  }
-
-  @Override
-  public void processElement(StreamRecord<HoodieFlinkInternalRow> element) 
throws Exception {
-    output.collect(element);
-  }
-
   /**
    * Loads all the indices of give partition path into the backup state.
    *
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/RLIBootstrapOperator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/RLIBootstrapOperator.java
new file mode 100644
index 000000000000..55b5d65cf232
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/RLIBootstrapOperator.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.bootstrap;
+
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.client.model.HoodieFlinkInternalRow;
+import org.apache.hudi.common.data.HoodiePairData;
+import org.apache.hudi.common.function.SerializableFunctionUnchecked;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.metadata.HoodieBackedTableMetadata;
+import org.apache.hudi.sink.event.Correspondent;
+import org.apache.hudi.sink.utils.OperatorIDGenerator;
+import org.apache.hudi.util.StreamerUtil;
+import org.apache.hudi.utils.RuntimeContextUtils;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * Bootstrap operator that loads record level index (RLI) data from metadata 
table.
+ *
+ * <p>This operator reads index data from the record_index partition of the 
metadata table.
+ * Each subtask reads one RLI partition (bucket) based on its task index, 
enabling parallel loading.
+ *
+ * <p>The loaded index records are emitted downstream to initialize the index 
state in
+ * {@link org.apache.hudi.sink.partitioner.BucketAssignFunction}.
+ */
+@Slf4j
+public class RLIBootstrapOperator
+    extends AbstractBootstrapOperator {
+
+  private final OperatorID dataWriteOperatorId;
+
+  private transient HoodieTableMetaClient metaClient;
+  private transient HoodieBackedTableMetadata metadataTable;
+  private transient Correspondent correspondent;
+  private transient long loadedCnt;
+
+  /**
+   * The last checkpoint id, starts from -1.
+   */
+  private long checkpointId = -1;
+
+  /**
+   * List state of the JobID.
+   */
+  private transient ListState<JobID> jobIdState;
+
+  public RLIBootstrapOperator(Configuration conf) {
+    super(conf);
+    String writeOperatorUid = conf.get(FlinkOptions.WRITE_OPERATOR_UID);
+    ValidationUtils.checkArgument(writeOperatorUid != null,
+        "Write operator UID should not be null when index is Record Level 
Index.");
+    this.dataWriteOperatorId = OperatorIDGenerator.fromUid(writeOperatorUid);
+  }
+
+  @Override
+  public void setup(StreamTask<?, ?> containingTask, StreamConfig config, 
Output<StreamRecord<HoodieFlinkInternalRow>> output) {
+    super.setup(containingTask, config, output);
+    this.correspondent = Correspondent.getInstance(dataWriteOperatorId,
+        
getContainingTask().getEnvironment().getOperatorCoordinatorEventGateway());
+  }
+
+  @Override
+  public void initializeState(StateInitializationContext context) throws 
Exception {
+    this.jobIdState = context.getOperatorStateStore().getListState(
+        new ListStateDescriptor<>(
+            "job-id-state",
+            TypeInformation.of(JobID.class)
+        ));
+    loadedCnt = 0;
+
+    int attemptId = RuntimeContextUtils.getAttemptNumber(getRuntimeContext());
+    if (context.isRestored()) {
+      initCheckpointId(attemptId, 
context.getRestoredCheckpointId().orElse(-1L));
+    }
+
+    if (context.isRestored()) {
+      // Wait for pending instants being committed successfully before loading 
the record index
+      log.info("Waiting for pending instants committed before RLI bootstrap.");
+      correspondent.awaitPendingInstantsCommitted(checkpointId);
+      log.info("All pending instants are completed, continue RLI bootstrap.");
+    }
+
+    this.metaClient = StreamerUtil.createMetaClient(conf);
+    this.metadataTable = (HoodieBackedTableMetadata) 
metaClient.getTableFormat().getMetadataFactory().create(
+        HoodieFlinkEngineContext.DEFAULT,
+        metaClient.getStorage(),
+        StreamerUtil.metadataConfig(conf),
+        conf.get(FlinkOptions.PATH));
+    // Load RLI records
+    preLoadRLIRecords();
+  }
+
+  @Override
+  public void snapshotState(StateSnapshotContext context) throws Exception {
+    super.snapshotState(context);
+    // Reload the job ID state
+    reloadJobIdState();
+    // Update checkpoint id
+    this.checkpointId = context.getCheckpointId();
+  }
+
+  @Override
+  public void close() throws Exception {
+    closeMetadataTable();
+    super.close();
+  }
+
+  // -------------------------------------------------------------------------
+  //  Utilities
+  // -------------------------------------------------------------------------
+
+  /**
+   * Reload the job id state as the current job id.
+   */
+  private void reloadJobIdState() throws Exception {
+    this.jobIdState.clear();
+    this.jobIdState.add(RuntimeContextUtils.getJobId(getRuntimeContext()));
+  }
+
+  private void initCheckpointId(int attemptId, long restoredCheckpointId) 
throws Exception {
+    if (attemptId <= 0) {
+      // returns early if the job/task is initially started.
+      return;
+    }
+    JobID currentJobId = RuntimeContextUtils.getJobId(getRuntimeContext());
+    if (StreamSupport.stream(this.jobIdState.get().spliterator(), false)
+        .noneMatch(currentJobId::equals)) {
+      // do not set up the checkpoint id if the state comes from the old job.
+      return;
+    }
+    this.checkpointId = restoredCheckpointId;
+  }
+
+  private void preLoadRLIRecords() {
+    int taskID = 
RuntimeContextUtils.getIndexOfThisSubtask(getRuntimeContext());
+    int parallelism = 
RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext());
+
+    log.info("Start loading RLI records from metadata table, taskId = {}, 
parallelism = {}", taskID, parallelism);
+
+    SerializableFunctionUnchecked<List<FileSlice>, List<FileSlice>> 
fileSlicesFilter = fileSlices -> {
+      List<FileSlice> filteredFileSlices = new ArrayList<>();
+      for (int i = 0; i < fileSlices.size(); i++) {
+        if (shouldLoadBucket(i, parallelism, taskID)) {
+          filteredFileSlices.add(fileSlices.get(i));
+        }
+      }
+      log.info("Subtask: {} will load record index records from file groups: 
{}, total file groups: {}.",
+          taskID, 
filteredFileSlices.stream().map(FileSlice::getFileId).collect(Collectors.joining(",")),
 fileSlices.size());
+      return filteredFileSlices;
+    };
+
+    // Each subtask loads buckets assigned to it
+    long startTime = System.currentTimeMillis();
+    HoodiePairData<String, HoodieRecordGlobalLocation> rliData = 
metadataTable.readRecordIndexLocations(fileSlicesFilter);
+    rliData.forEach(locationPair -> emitIndexRecord(locationPair.getLeft(), 
locationPair.getRight()));
+    long costMs = System.currentTimeMillis() - startTime;
+    log.info("Finish loading RLI records, total records: {}, cost: {} ms, 
taskId = {}", loadedCnt, costMs, taskID);
+
+    // Wait for other tasks to complete
+    waitForBootstrapReady(taskID);
+
+    // Cleanup resources
+    closeMetadataTable();
+  }
+
+  /**
+   * Determines if the given file group should be loaded by this task.
+   * Uses round-robin assignment: file group i is assigned to task (i % 
parallelism).
+   */
+  private boolean shouldLoadBucket(int fileGroupIdx, int parallelism, int 
taskID) {
+    return fileGroupIdx % parallelism == taskID;
+  }
+
+  private void emitIndexRecord(String recordKey, HoodieRecordGlobalLocation 
location) {
+    output.collect(new StreamRecord<>(
+        new HoodieFlinkInternalRow(
+            recordKey,
+            location.getPartitionPath(),
+            location.getFileId(),
+            String.valueOf(location.getInstantTime()))));
+    loadedCnt += 1;
+  }
+
+  private void closeMetadataTable() {
+    if (metadataTable != null) {
+      try {
+        metadataTable.close();
+      } catch (Exception e) {
+        log.warn("Failed to close metadata table", e);
+      }
+      metadataTable = null;
+    }
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/event/Correspondent.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/event/Correspondent.java
index 9bc8f8242ec5..5d599514bb0b 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/event/Correspondent.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/event/Correspondent.java
@@ -100,6 +100,18 @@ public class Correspondent {
     }
   }
 
+  /**
+   * Requests coordinator to wait until all pending instants are committed if 
necessary.
+   */
+  public void awaitPendingInstantsCommitted(long checkpointId) {
+    try {
+      this.gateway.sendRequestToCoordinator(this.operatorID,
+          new 
SerializedValue<>(AwaitPendingInstantsRequest.getInstance(checkpointId))).get();
+    } catch (Exception e) {
+      throw new HoodieException("Error awaiting pending instants completion 
from coordinator", e);
+    }
+  }
+
   /**
    * A request for instant time with a given checkpoint id.
    */
@@ -153,4 +165,30 @@ public class Correspondent {
       return new InflightInstantsResponse(inflightInstants);
     }
   }
+
+  /**
+   * A request to wait until all pending instants are committed in the 
coordinator.
+   */
+  @AllArgsConstructor(access = AccessLevel.PRIVATE)
+  @Getter
+  public static class AwaitPendingInstantsRequest implements 
CoordinationRequest {
+
+    private final long checkpointId;
+
+    public static AwaitPendingInstantsRequest getInstance(long checkpointId) {
+      return new AwaitPendingInstantsRequest(checkpointId);
+    }
+  }
+
+  /**
+   * A response to indicate pending instants are all completed.
+   */
+  @AllArgsConstructor(access = AccessLevel.PRIVATE)
+  @Getter
+  public static class AwaitPendingInstantsResponse implements 
CoordinationResponse {
+
+    public static AwaitPendingInstantsResponse getInstance() {
+      return new AwaitPendingInstantsResponse();
+    }
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexBackendFactory.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexBackendFactory.java
index b8410e2a2386..f3bdaad5c145 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexBackendFactory.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexBackendFactory.java
@@ -58,20 +58,24 @@ public class IndexBackendFactory {
         ValidationUtils.checkArgument(indexState != null, "indexState should 
not be null when using FLINK_STATE index!");
         return new FlinkStateIndexBackend(indexState);
       case GLOBAL_RECORD_LEVEL_INDEX:
-        ListState<JobID> jobIdState = 
context.getOperatorStateStore().getListState(
-            new ListStateDescriptor<>(
-                "bucket-assign-job-id-state",
-                TypeInformation.of(JobID.class)
-            ));
-        long initCheckpointId = -1;
-        if (context.isRestored()) {
-          int attemptId = RuntimeContextUtils.getAttemptNumber(runtimeContext);
-          initCheckpointId = initCheckpointId(attemptId, jobIdState, 
context.getRestoredCheckpointId().orElse(-1L), runtimeContext);
+        if (conf.get(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
+          return new 
RocksDBIndexBackend(conf.get(FlinkOptions.INDEX_BOOTSTRAP_ROCKSDB_PATH));
+        } else {
+          ListState<JobID> jobIdState = 
context.getOperatorStateStore().getListState(
+              new ListStateDescriptor<>(
+                  "bucket-assign-job-id-state",
+                  TypeInformation.of(JobID.class)
+              ));
+          long initCheckpointId = -1;
+          if (context.isRestored()) {
+            int attemptId = 
RuntimeContextUtils.getAttemptNumber(runtimeContext);
+            initCheckpointId = initCheckpointId(attemptId, jobIdState, 
context.getRestoredCheckpointId().orElse(-1L), runtimeContext);
+          }
+          // set the jobId state with current job id.
+          jobIdState.clear();
+          jobIdState.add(RuntimeContextUtils.getJobId(runtimeContext));
+          return new RecordLevelIndexBackend(conf, initCheckpointId);
         }
-        // set the jobId state with current job id.
-        jobIdState.clear();
-        jobIdState.add(RuntimeContextUtils.getJobId(runtimeContext));
-        return new RecordLevelIndexBackend(conf, initCheckpointId);
       default:
         throw new UnsupportedOperationException("Index type " + indexType + " 
is not supported for bucket assigning yet.");
     }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexWriteFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexWriteFunction.java
index ee06ac12ed5a..ad13e54bd86e 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexWriteFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexWriteFunction.java
@@ -180,6 +180,7 @@ public class IndexWriteFunction extends 
AbstractStreamWriteFunction<RowData> {
 
   @Override
   public void endInput() {
+    super.endInput();
     // flush index data buffer
     flushBuffer(true, true);
   }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RocksDBIndexBackend.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RocksDBIndexBackend.java
new file mode 100644
index 000000000000..88ae067adc4d
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RocksDBIndexBackend.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.partitioner.index;
+
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
+import org.apache.hudi.common.util.collection.RocksDBDAO;
+
+import java.io.IOException;
+
+/**
+ * An implementation of {@link IndexBackend} based on RocksDB.
+ */
+public class RocksDBIndexBackend implements IndexBackend {
+  private static final String COLUMN_FAMILY = "index_cache";
+
+  private final RocksDBDAO rocksDBDAO;
+
+  public RocksDBIndexBackend(String rocksDbBasePath) {
+    this.rocksDBDAO = new RocksDBDAO("hudi-index-backend", rocksDbBasePath);
+    this.rocksDBDAO.addColumnFamily(COLUMN_FAMILY);
+  }
+
+  @Override
+  public HoodieRecordGlobalLocation get(String recordKey) {
+    return this.rocksDBDAO.get(COLUMN_FAMILY, recordKey);
+  }
+
+  @Override
+  public void update(String recordKey, HoodieRecordGlobalLocation 
recordGlobalLocation) {
+    this.rocksDBDAO.put(COLUMN_FAMILY, recordKey, recordGlobalLocation);
+  }
+
+  @Override
+  public void close() throws IOException {
+    this.rocksDBDAO.close();
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/CommitGuard.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/CommitGuard.java
index 0be33a05dffc..76836b6648b9 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/CommitGuard.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/CommitGuard.java
@@ -20,10 +20,12 @@ package org.apache.hudi.sink.utils;
 
 import org.apache.hudi.exception.HoodieException;
 
+import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Supplier;
 
 /**
  * The commit guard used for blocking instant time generation.
@@ -66,6 +68,29 @@ public class CommitGuard {
     }
   }
 
+  /**
+   * Wait until the pending instants are committed.
+   *
+   * @param pendingInstants Supplier to get the pending instants
+   */
+  public void blockFor(Supplier<List<String>> pendingInstants) {
+    lock.lock();
+    long nanos = TimeUnit.MILLISECONDS.toNanos(commitAckTimeout);
+    try {
+      while (!pendingInstants.get().isEmpty()) {
+        if (nanos <= 0L) {
+          throw new HoodieException("Timeout(" + commitAckTimeout + "ms) while 
waiting for instants ["
+              + pendingInstants.get() + "] to commit");
+        }
+        nanos = condition.awaitNanos(nanos);
+      }
+    } catch (InterruptedException e) {
+      throw new HoodieException("Blocking for instants completion is 
interrupted.", e);
+    } finally {
+      lock.unlock();
+    }
+  }
+
   /**
    * Signals all waited threads which are waiting on the condition.
    */
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/EventBuffers.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/EventBuffers.java
index 701e0f23bb3d..060a74198f5f 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/EventBuffers.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/EventBuffers.java
@@ -49,25 +49,30 @@ public class EventBuffers implements Serializable {
   // {checkpointId -> (instant, data write events, index write events)}
   private final Map<Long, Pair<String, EventBuffer>> eventBuffers;
   private final Option<CommitGuard> commitGuardOption;
+  private final Option<CommitGuard> indexBootstrapGuardOption;
   private final int dataWriteParallelism;
   private final int indexWriteParallelism;
 
   private EventBuffers(
       Map<Long, Pair<String, EventBuffer>> eventBuffers,
       Option<CommitGuard> commitGuardOption,
+      Option<CommitGuard> indexBootstrapGuardOption,
       int dataWriteParallelism,
       int indexWriteParallelism) {
     this.eventBuffers = eventBuffers;
     this.commitGuardOption = commitGuardOption;
     this.dataWriteParallelism = dataWriteParallelism;
     this.indexWriteParallelism = indexWriteParallelism;
+    this.indexBootstrapGuardOption = indexBootstrapGuardOption;
   }
 
   public static EventBuffers getInstance(Configuration conf, int 
dataWriteParallelism) {
     final Option<CommitGuard> commitGuardOpt = 
OptionsResolver.isBlockingInstantGeneration(conf)
         ? 
Option.of(CommitGuard.create(conf.get(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT))) 
: Option.empty();
+    final Option<CommitGuard> indexBootstrapGuardOption = 
OptionsResolver.isRLIWithBootstrap(conf)
+        ? 
Option.of(CommitGuard.create(conf.get(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT))) 
: Option.empty();
     final int indexWriteParallelism = 
OptionsResolver.indexWriteParallelism(conf);
-    return new EventBuffers(new ConcurrentSkipListMap<>(), commitGuardOpt, 
dataWriteParallelism, indexWriteParallelism);
+    return new EventBuffers(new ConcurrentSkipListMap<>(), commitGuardOpt, 
indexBootstrapGuardOption, dataWriteParallelism, indexWriteParallelism);
   }
 
   public EventBuffer addEventToBuffer(WriteMetadataEvent event) {
@@ -143,9 +148,17 @@ public class EventBuffers implements Serializable {
     }
   }
 
+  public void awaitPrevInstantsToComplete(long checkpointId) {
+    List<String> pendingInstants = getPendingInstantsBefore(checkpointId);
+    if (!pendingInstants.isEmpty() && 
this.indexBootstrapGuardOption.isPresent()) {
+      this.indexBootstrapGuardOption.get().blockFor(() -> 
getPendingInstantsBefore(checkpointId));
+    }
+  }
+
   public void reset(long checkpointId) {
     this.eventBuffers.remove(checkpointId);
     this.commitGuardOption.ifPresent(CommitGuard::unblock);
+    this.indexBootstrapGuardOption.ifPresent(CommitGuard::unblock);
   }
 
   public boolean nonEmpty() {
@@ -155,6 +168,13 @@ public class EventBuffers implements Serializable {
         .anyMatch(Objects::nonNull);
   }
 
+  public List<String> getPendingInstantsBefore(long checkpointId) {
+    return this.eventBuffers.entrySet().stream()
+        .filter(entry -> entry.getKey() < checkpointId)
+        .map(entry -> entry.getValue().getLeft())
+        .collect(Collectors.toList());
+  }
+
   public String getPendingInstants() {
     return 
this.eventBuffers.values().stream().map(Pair::getKey).collect(Collectors.joining(","));
   }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
index 34d016f94960..e7fc705d4b13 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
@@ -33,6 +33,7 @@ import org.apache.hudi.sink.StreamWriteOperator;
 import org.apache.hudi.sink.append.AppendWriteFunctions;
 import org.apache.hudi.sink.append.AppendWriteOperator;
 import org.apache.hudi.sink.bootstrap.BootstrapOperator;
+import org.apache.hudi.sink.bootstrap.RLIBootstrapOperator;
 import org.apache.hudi.sink.bootstrap.batch.BatchBootstrapOperator;
 import org.apache.hudi.sink.bucket.BucketBulkInsertWriterHelper;
 import org.apache.hudi.sink.bucket.BucketStreamWriteOperator;
@@ -69,9 +70,11 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
 import org.apache.flink.streaming.api.operators.ProcessOperator;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
@@ -281,13 +284,18 @@ public class Pipelines {
     DataStream<HoodieFlinkInternalRow> dataStream1 = 
rowDataToHoodieRecord(conf, rowType, dataStream);
 
     if (conf.get(FlinkOptions.INDEX_BOOTSTRAP_ENABLED) || bounded) {
+      boolean isRliBootstrap = OptionsResolver.isRecordLevelIndex(conf);
+      if (isRliBootstrap) {
+        conf.set(FlinkOptions.WRITE_OPERATOR_UID, 
Pipelines.opUID("stream_write", conf));
+      }
       dataStream1 = dataStream1
           .transform(
               "index_bootstrap",
               new HoodieFlinkInternalRowTypeInfo(rowType),
-              new BootstrapOperator(conf))
+              isRliBootstrap ? new RLIBootstrapOperator(conf) : new 
BootstrapOperator(conf))
           
.setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(dataStream1.getParallelism()))
           .uid(opUID("index_bootstrap", conf));
+      ((OneInputTransformation<?, ?>) 
dataStream1.getTransformation()).setChainingStrategy(ChainingStrategy.ALWAYS);
     }
 
     return dataStream1;
@@ -351,8 +359,8 @@ public class Pipelines {
    * @return the stream write data stream pipeline
    */
   public static DataStream<RowData> hoodieStreamWrite(Configuration conf,
-                                                     RowType rowType,
-                                                     
DataStream<HoodieFlinkInternalRow> dataStream) {
+                                                      RowType rowType,
+                                                      
DataStream<HoodieFlinkInternalRow> dataStream) {
     if (OptionsResolver.isBucketIndexType(conf)) {
       HoodieIndex.BucketIndexEngineType bucketIndexEngineType = 
OptionsResolver.getBucketEngineType(conf);
       switch (bucketIndexEngineType) {
@@ -398,10 +406,12 @@ public class Pipelines {
           throw new HoodieNotSupportedException("Unknown bucket index engine 
type: " + bucketIndexEngineType);
       }
     } else {
+      // if the index is RLI, the write operator UID will be pre-generated and 
set into the configuration.
+      String writeOperatorUid = conf.get(FlinkOptions.WRITE_OPERATOR_UID);
+      writeOperatorUid = writeOperatorUid == null ? opUID("stream_write", 
conf) : writeOperatorUid;
       // uuid is used to generate operator id for the write operator, then the 
bucket assign operator can send
       // operator event to the coordinator of the write operator based on the 
operator id.
       // @see org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway.
-      String writeOperatorUid = opUID("stream_write", conf);
       DataStream<HoodieFlinkInternalRow> bucketAssignStream = 
createBucketAssignStream(dataStream, conf, rowType, writeOperatorUid);
       boolean isStreamingIndexWriteEnabled = 
OptionsResolver.isStreamingIndexWriteEnabled(conf);
       SingleOutputStreamOperator<RowData> writeDatastream =
@@ -444,7 +454,7 @@ public class Pipelines {
   private static DataStream<HoodieFlinkInternalRow> createBucketAssignStream(
       DataStream<HoodieFlinkInternalRow> inputStream, Configuration conf, 
RowType rowType, String writeOperatorUid) {
     String assignerOperatorName = "bucket_assigner";
-    if (OptionsResolver.isRecordLevelIndex(conf)) {
+    if (OptionsResolver.isRecordLevelIndex(conf) && 
!conf.get(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
       return inputStream
           .partitionCustom(new RecordIndexPartitioner(conf), row -> new 
HoodieKey(row.getRecordKey(), row.getPartitionPath()))
           .transform(
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
index 7580fd80ce56..37a57cdd6e78 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
@@ -413,6 +413,10 @@ public class HoodieTableFactory implements 
DynamicTableSourceFactory, DynamicTab
       
conf.setString(HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_ENABLE_PROP.key(),
 "true");
       conf.set(FlinkOptions.INDEX_GLOBAL_ENABLED, true);
       conf.setString(HoodieMetadataConfig.STREAMING_WRITE_ENABLED.key(), 
"true");
+      // set index bootstrap as true if not specified by user explicitly.
+      if (!conf.contains(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
+        conf.set(FlinkOptions.INDEX_BOOTSTRAP_ENABLED, true);
+      }
       // generally size of index data is much smaller than data record, so set 
the buffer size of
       // the index writer as 1/4 of that for data writer if it's not set by 
user explicitly.
       if (!conf.contains(FlinkOptions.INDEX_RLI_WRITE_BUFFER_SIZE)) {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
index 499137acaf9b..a389d721908b 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
@@ -234,6 +234,10 @@ public class FlinkWriteClients {
             .withMetadataConfig(HoodieMetadataConfig.newBuilder()
                 .withEngineType(EngineType.FLINK) // this affects the default 
value inference
                 .enable(conf.get(FlinkOptions.METADATA_ENABLED))
+                .withRecordIndexFileGroupCount(
+                    
Integer.parseInt(conf.getString(HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_MIN_FILE_GROUP_COUNT_PROP.key(),
 "8")),
+                    
Integer.parseInt(conf.getString(HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_MAX_FILE_GROUP_COUNT_PROP.key(),
+                        
HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_MAX_FILE_GROUP_COUNT_PROP.defaultValue()
 + "")))
                 
.withMaxNumDeltaCommitsBeforeCompaction(conf.get(FlinkOptions.METADATA_COMPACTION_DELTA_COMMITS))
                 .build())
             .withIndexConfig(StreamerUtil.getIndexConfig(conf))
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
index a5ccb2ffb20f..0d6a375bf7a6 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
@@ -38,6 +38,7 @@ import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.HadoopConfigurations;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.sink.muttley.AthenaIngestionGateway;
 import org.apache.hudi.sink.event.Correspondent;
@@ -671,6 +672,33 @@ public class TestStreamWriteOperatorCoordinator {
     assertEquals(instant2, inflightInstants.get(2L));
   }
 
+  @Test
+  void testHandleAwaitPendingInstantsRequest() throws Exception {
+    Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+    conf.set(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ.name());
+    conf.set(FlinkOptions.INDEX_TYPE, 
HoodieIndex.IndexType.GLOBAL_RECORD_LEVEL_INDEX.name());
+    conf.set(FlinkOptions.INDEX_BOOTSTRAP_ENABLED, true);
+    conf.set(FlinkOptions.INDEX_WRITE_TASKS, 4);
+    coordinator = createCoordinator(conf, 1);
+
+    Thread t = new Thread(() -> {
+      try {
+        CompletableFuture<CoordinationResponse> responseFuture =
+            
coordinator.handleCoordinationRequest(Correspondent.AwaitPendingInstantsRequest.getInstance(1));
+        Correspondent.AwaitPendingInstantsResponse response =
+            CoordinationResponseSerDe.unwrap(responseFuture.get());
+        assertNotNull(response);
+      } catch (Exception e) {
+        throw new HoodieException(e);
+      }
+    });
+    t.start();
+    // send a bootstrap event to unblock the simulated request from bootstrap 
operator.
+    WriteMetadataEvent event1 = createBootstrapEvent(0, 0, 
coordinator.getInstant(), "par1");
+    coordinator.handleEventFromOperator(0, event1);
+    t.join();
+  }
+
   // -------------------------------------------------------------------------
   //  Utilities
   // -------------------------------------------------------------------------
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestRocksDBIndexBackend.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestRocksDBIndexBackend.java
new file mode 100644
index 000000000000..3a59193c86f5
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestRocksDBIndexBackend.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.partitioner.index;
+
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+/**
+ * Test cases for {@link RocksDBIndexBackend}.
+ */
+public class TestRocksDBIndexBackend {
+
+  @TempDir
+  File tempFile;
+
+  @Test
+  void testGetAndUpdate() throws Exception {
+    try (RocksDBIndexBackend rocksDBIndexBackend = new 
RocksDBIndexBackend(tempFile.getAbsolutePath())) {
+      assertNull(rocksDBIndexBackend.get("id1"));
+
+      HoodieRecordGlobalLocation location1 = new 
HoodieRecordGlobalLocation("par1", "001", "file1");
+      rocksDBIndexBackend.update("id1", location1);
+      assertEquals(location1, rocksDBIndexBackend.get("id1"));
+
+      HoodieRecordGlobalLocation location2 = new 
HoodieRecordGlobalLocation("par2", "002", "file2");
+      rocksDBIndexBackend.update("id2", location2);
+      assertEquals(location2, rocksDBIndexBackend.get("id2"));
+    }
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestCommitGuard.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestCommitGuard.java
new file mode 100644
index 000000000000..3fc00e76d9ff
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestCommitGuard.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.utils;
+
+import org.apache.hudi.exception.HoodieException;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test cases for {@link CommitGuard}.
+ */
+public class TestCommitGuard {
+
+  @Test
+  void testBlockForRechecksPendingInstantsAfterSignal() throws Exception {
+    CommitGuard guard = CommitGuard.create(5_000L);
+    AtomicBoolean pending = new AtomicBoolean(true);
+    ExecutorService executor = Executors.newSingleThreadExecutor();
+    try {
+      CompletableFuture<Void> waitingFuture1 = CompletableFuture.runAsync(
+          () -> guard.blockFor(() -> pending.get() ? List.of("001", "002") : 
Collections.emptyList()), executor);
+      CompletableFuture<Void> waitingFuture2 = CompletableFuture.runAsync(
+          () -> guard.blockFor(() -> pending.get() ? List.of("001", "002") : 
Collections.emptyList()), executor);
+
+      Thread.sleep(100);
+      guard.unblock();
+      Thread.sleep(100);
+      assertFalse(waitingFuture1.isDone(), "Should still block because pending 
instants are not empty");
+      assertFalse(waitingFuture2.isDone(), "Should still block because pending 
instants are not empty");
+
+      pending.set(false);
+      guard.unblock();
+      waitingFuture1.get(2, TimeUnit.SECONDS);
+      assertTrue(waitingFuture1.isDone(), "Should finish when pending instants 
become empty");
+      waitingFuture2.get(2, TimeUnit.SECONDS);
+      assertTrue(waitingFuture2.isDone(), "Should finish when pending instants 
become empty");
+    } finally {
+      executor.shutdownNow();
+    }
+  }
+
+  @Test
+  void testBlockForTimeoutWhenPendingInstantsRemain() {
+    CommitGuard guard = CommitGuard.create(100L);
+    HoodieException hoodieException = assertThrows(
+        HoodieException.class,
+        () -> guard.blockFor(() -> 
Collections.singletonList("timeout-instant")));
+    assertTrue(hoodieException.getMessage().contains("timeout-instant"));
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestEventBuffers.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestEventBuffers.java
new file mode 100644
index 000000000000..40fe4e58edd0
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestEventBuffers.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.utils;
+
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.sink.event.WriteMetadataEvent;
+
+import org.apache.flink.configuration.Configuration;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test cases for {@link EventBuffers}.
+ */
+public class TestEventBuffers {
+
+  @Test
+  void testAwaitAllInstantsWaitsUntilAllPendingInstantsReset() throws 
Exception {
+    Configuration conf = new Configuration();
+    conf.set(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, 5_000L);
+    EventBuffers eventBuffers = EventBuffers.getInstance(conf, 1);
+
+    eventBuffers.initNewEventBuffer(1L, "001");
+    eventBuffers.addEventToBuffer(newWriteEvent(1L, "001"));
+
+    ExecutorService executor = Executors.newSingleThreadExecutor();
+    try {
+      // two tasks block waiting on commit.
+      CompletableFuture<Void> waitingFuture1 = CompletableFuture.runAsync(
+          eventBuffers::awaitAllInstantsToCompleteIfNecessary, executor);
+      CompletableFuture<Void> waitingFuture2 = CompletableFuture.runAsync(
+          eventBuffers::awaitAllInstantsToCompleteIfNecessary, executor);
+
+      Thread.sleep(100);
+      assertFalse(waitingFuture1.isDone());
+      assertFalse(waitingFuture2.isDone());
+
+      eventBuffers.reset(1L);
+      Thread.sleep(100);
+      assertTrue(waitingFuture1.isDone());
+      assertTrue(waitingFuture2.isDone());
+    } finally {
+      executor.shutdownNow();
+    }
+  }
+
+  @Test
+  void testAwaitPrevInstantsWaitsUntilAllPreviousCheckpointsReset() throws 
Exception {
+    Configuration conf = new Configuration();
+    conf.set(FlinkOptions.INDEX_TYPE, 
HoodieIndex.IndexType.GLOBAL_RECORD_LEVEL_INDEX.name());
+    conf.set(FlinkOptions.INDEX_BOOTSTRAP_ENABLED, true);
+    conf.set(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, 5_000L);
+    conf.set(FlinkOptions.INDEX_WRITE_TASKS, 4);
+    EventBuffers eventBuffers = EventBuffers.getInstance(conf, 1);
+
+    eventBuffers.initNewEventBuffer(1L, "001");
+    eventBuffers.initNewEventBuffer(2L, "002");
+    eventBuffers.initNewEventBuffer(3L, "003");
+
+    ExecutorService executor = Executors.newSingleThreadExecutor();
+    try {
+      // two tasks block waiting on commit.
+      CompletableFuture<Void> waitingFuture1 = CompletableFuture.runAsync(
+          () -> eventBuffers.awaitPrevInstantsToComplete(3L), executor);
+      CompletableFuture<Void> waitingFuture2 = CompletableFuture.runAsync(
+          () -> eventBuffers.awaitPrevInstantsToComplete(3L), executor);
+
+      Thread.sleep(100);
+      assertFalse(waitingFuture1.isDone());
+      assertFalse(waitingFuture2.isDone());
+
+      eventBuffers.reset(1L);
+      Thread.sleep(100);
+      assertFalse(waitingFuture1.isDone(), "Checkpoint 2 still pending, should 
continue blocking");
+      assertFalse(waitingFuture2.isDone(), "Checkpoint 2 still pending, should 
continue blocking");
+
+      eventBuffers.reset(2L);
+      waitingFuture1.get(2, TimeUnit.SECONDS);
+      assertTrue(waitingFuture1.isDone());
+      waitingFuture2.get(2, TimeUnit.SECONDS);
+      assertTrue(waitingFuture2.isDone());
+    } finally {
+      executor.shutdownNow();
+    }
+  }
+
+  private static WriteMetadataEvent newWriteEvent(long checkpointId, String 
instant) {
+    return WriteMetadataEvent.builder()
+        .taskID(0)
+        .checkpointId(checkpointId)
+        .instantTime(instant)
+        .writeStatus(Collections.emptyList())
+        .lastBatch(true)
+        .build();
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 03fd014a9b78..f577d4cfaf39 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -3069,6 +3069,31 @@ public class ITTestHoodieDataSource {
             + "+I[id3, id3, Julian, 43, 1970-01-01T00:00:03, par1, par1]]");
   }
 
+  @Test
+  void testRLIBootstrap() {
+    TableEnvironment tableEnv = streamTableEnv;
+    String hoodieTableDDL = sql("t1")
+        .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .options(getDefaultKeys())
+        .option(FlinkOptions.INDEX_TYPE, 
HoodieIndex.IndexType.GLOBAL_RECORD_LEVEL_INDEX.name())
+        .option(FlinkOptions.READ_DATA_SKIPPING_ENABLED, true)
+        .option(FlinkOptions.TABLE_TYPE, MERGE_ON_READ.name())
+        .end();
+    tableEnv.executeSql(hoodieTableDDL);
+    execInsertSql(tableEnv, TestSQL.INSERT_T1);
+
+    List<Row> result1 = CollectionUtil.iterableToList(
+        () -> tableEnv.sqlQuery("select * from t1").execute().collect());
+    assertRowsEquals(result1, TestData.DATA_SET_SOURCE_INSERT);
+
+    // insert another batch of records
+    execInsertSql(tableEnv, TestSQL.UPDATE_INSERT_T1);
+
+    result1 = CollectionUtil.iterableToList(
+        () -> tableEnv.sqlQuery("select * from t1").execute().collect());
+    assertRowsEquals(result1, TestData.DATA_SET_SOURCE_MERGED);
+  }
+
   @ParameterizedTest
   @EnumSource(value = HoodieTableType.class)
   void testMiniBatchBucketAssign(HoodieTableType tableType) throws Exception {
@@ -3078,7 +3103,6 @@ public class ITTestHoodieDataSource {
         .options(getDefaultKeys())
         .option(FlinkOptions.INDEX_TYPE, 
HoodieIndex.IndexType.GLOBAL_RECORD_LEVEL_INDEX.name())
         .option(FlinkOptions.READ_DATA_SKIPPING_ENABLED, true)
-        .option(FlinkOptions.TABLE_TYPE, COPY_ON_WRITE)
         .option(FlinkOptions.TABLE_TYPE, tableType.name())
         .end();
     tableEnv.executeSql(hoodieTableDDL);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestFlinkWriteClients.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestFlinkWriteClients.java
index 245300445ca2..0762bd66838d 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestFlinkWriteClients.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestFlinkWriteClients.java
@@ -25,6 +25,7 @@ import 
org.apache.hudi.client.model.CommitTimeFlinkRecordMerger;
 import org.apache.hudi.client.model.EventTimeFlinkRecordMerger;
 import org.apache.hudi.client.model.PartialUpdateFlinkRecordMerger;
 import org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
 import org.apache.hudi.common.model.EventTimeAvroPayload;
@@ -39,6 +40,7 @@ import org.apache.hudi.common.table.marker.MarkerType;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.io.FileGroupReaderBasedMergeHandle;
+import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.marker.DirectWriteMarkers;
 import org.apache.hudi.table.marker.TimelineServerBasedWriteMarkers;
@@ -103,6 +105,21 @@ public class TestFlinkWriteClients {
     }
   }
 
+  @Test
+  void testDefaultGlobalRecordIndexMinFileGroupCountForFlink() {
+    conf.set(FlinkOptions.INDEX_TYPE, 
HoodieIndex.IndexType.GLOBAL_RECORD_LEVEL_INDEX.name());
+    HoodieWriteConfig writeConfig = 
FlinkWriteClients.getHoodieClientConfig(conf, false, false);
+    assertEquals(8, writeConfig.getGlobalRecordLevelIndexMinFileGroupCount());
+  }
+
+  @Test
+  void testUserConfiguredGlobalRecordIndexMinFileGroupCountIsNotOverridden() {
+    conf.set(FlinkOptions.INDEX_TYPE, 
HoodieIndex.IndexType.GLOBAL_RECORD_LEVEL_INDEX.name());
+    
conf.setString(HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_MIN_FILE_GROUP_COUNT_PROP.key(),
 "12");
+    HoodieWriteConfig writeConfig = 
FlinkWriteClients.getHoodieClientConfig(conf, false, false);
+    assertEquals(12, writeConfig.getGlobalRecordLevelIndexMinFileGroupCount());
+  }
+
   @ParameterizedTest
   @ValueSource(strings = {"", "DIRECT", "TIMELINE_SERVER_BASED"})
   void testMarkerType(String markerType) throws Exception {

Reply via email to