This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 93e334ce042d feat(flink): Support dynamic bucket for flink streaming 
with partitio… (#18640)
93e334ce042d is described below

commit 93e334ce042d72418c77abaa3246231b96a5615c
Author: Shuo Cheng <[email protected]>
AuthorDate: Sat May 9 20:29:41 2026 +0800

    feat(flink): Support dynamic bucket for flink streaming with partitio… 
(#18640)
---
 .../metadata/HoodieBackedTableMetadataWriter.java  |   5 +
 .../apache/hudi/client/HoodieFlinkWriteClient.java |   3 +-
 .../apache/hudi/index/FlinkHoodieIndexFactory.java |   1 +
 .../apache/hudi/configuration/FlinkOptions.java    |   9 +
 .../apache/hudi/configuration/OptionsResolver.java |  15 +-
 .../sink/partitioner/BucketAssignFunction.java     |   4 +-
 .../hudi/sink/partitioner/BucketAssigner.java      |   2 +
 .../partitioner/DynamicBucketAssignFunction.java   | 164 +++++++++
 .../partitioner/DynamicBucketAssignOperator.java   |  64 ++++
 ...oner.java => GlobalRecordIndexPartitioner.java} |  40 ++-
 .../sink/partitioner/RecordIndexPartitioner.java   |  89 ++++-
 .../index/DummyPartitionedIndexBackend.java        |  46 +++
 .../partitioner/index/FlinkStateIndexBackend.java  |   4 +-
 .../{IndexBackend.java => GlobalIndexBackend.java} |  37 +-
 ...end.java => GlobalRecordLevelIndexBackend.java} |  30 +-
 .../hudi/sink/partitioner/index/IndexBackend.java  |  21 +-
 .../partitioner/index/IndexBackendFactory.java     |  17 +-
 .../sink/partitioner/index/IndexWriteFunction.java |  15 +-
 .../partitioner/index/MinibatchIndexBackend.java   |  18 +-
 .../partitioner/index/PartitionedIndexBackend.java |  47 +++
 .../partitioner/index/RecordLevelIndexBackend.java | 380 +++++++++++++++++----
 .../partitioner/index/RocksDBIndexBackend.java     |   4 +-
 .../java/org/apache/hudi/sink/utils/Pipelines.java |  29 +-
 .../SamplingActionExecutor.java}                   |  36 +-
 .../apache/hudi/source/stats/RecordLevelIndex.java |  12 +-
 .../org/apache/hudi/table/HoodieTableFactory.java  |  38 ++-
 .../hudi/configuration/TestOptionsResolver.java    |  19 ++
 ....java => TestGlobalRecordIndexPartitioner.java} |  10 +-
 .../partitioner/TestRecordIndexPartitioner.java    | 110 +++---
 ...java => TestGlobalRecordLevelIndexBackend.java} |  66 ++--
 .../index/TestRecordLevelIndexBackend.java         | 258 +++++++-------
 .../org/apache/hudi/sink/utils/TestWriteBase.java  |   6 +-
 .../hudi/table/ITTestDynamicBucketStreamWrite.java | 269 +++++++++++++++
 33 files changed, 1445 insertions(+), 423 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 4c1c90fe2791..db30f301d97a 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -29,6 +29,7 @@ import org.apache.hudi.client.RunsTableService;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.data.HoodieListData;
 import org.apache.hudi.common.model.ActionType;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.EngineType;
@@ -1478,6 +1479,10 @@ public abstract class HoodieBackedTableMetadataWriter<I, 
O> implements HoodieTab
    */
   protected Pair<List<HoodieFileGroupId>, HoodieData<HoodieRecord>> 
tagRecordsWithLocationForStreamingWrites(HoodieData<HoodieRecord> 
untaggedRecords,
                                                                                
                              Set<String> enabledMetadataPartitions) {
+    // no need to tag of the incoming records is empty.
+    if (untaggedRecords instanceof HoodieListData && 
untaggedRecords.isEmpty()) {
+      return Pair.of(Collections.emptyList(), untaggedRecords);
+    }
     List<HoodieFileGroupId> updatedFileGroupIds = new ArrayList<>();
     // Fetch latest file slices for all enabled MDT partitions
     Map<String, List<FileSlice>> partitionToLatestFileSlices = new HashMap<>();
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index 1e5a691833d0..228209cf1df6 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -173,7 +173,8 @@ public class HoodieFlinkWriteClient<T>
   protected void writeToMetadataTable(boolean 
skipStreamingWritesToMetadataTable, HoodieTable table, String instantTime, 
List<HoodieWriteStat> partialMetadataTableWriteStats,
                                       HoodieCommitMetadata metadata) {
     if (!skipStreamingWritesToMetadataTable
-        && isStreamingWriteToMetadataEnabled(table)) {
+        && isStreamingWriteToMetadataEnabled(table)
+        && 
WriteOperationType.streamingWritesToMetadataSupported(getOperationType())) {
       streamingMetadataWriteHandler.commitToMetadataTable(table, instantTime, 
metadata, partialMetadataTableWriteStats);
     } else {
       super.writeToMetadataTable(skipStreamingWritesToMetadataTable, table, 
instantTime, partialMetadataTableWriteStats, metadata);
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/FlinkHoodieIndexFactory.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/FlinkHoodieIndexFactory.java
index feca357bc493..a1daae0aca01 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/FlinkHoodieIndexFactory.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/FlinkHoodieIndexFactory.java
@@ -48,6 +48,7 @@ public final class FlinkHoodieIndexFactory {
         // instantiates an in-memory HoodieIndex component as a placeholder.
       case GLOBAL_RECORD_LEVEL_INDEX:
         // todo: aligned with flink state index currently, may need further 
improving.
+      case RECORD_LEVEL_INDEX:
       case INMEMORY:
         return new FlinkInMemoryStateIndex(context, config);
       case BLOOM:
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 767a7457e1e7..4910c721e09c 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -308,6 +308,15 @@ public class FlinkOptions extends HoodieConfig {
           + "The memory size of each individual cache within a checkpoint 
interval is dynamically calculated based on the \n"
           + "average memory size of caches for historical checkpoints.");
 
+  @AdvancedConfig
+  public static final ConfigOption<Integer> 
INDEX_RLI_CACHE_CONCURRENT_PARTITIONS_NUM = ConfigOptions
+      .key("index.rli.cache.concurrent.partitions.num")
+      .intType()
+      .defaultValue(2)
+      .withDescription("Expected number of partitions whose partitioned RLI 
caches are updated concurrently. "
+          + "Used to infer the initial memory size for each partition cache as 
INDEX_RLI_CACHE_SIZE / concurrency "
+          + "when historical cache usage is unavailable.");
+
   @AdvancedConfig
   public static final ConfigOption<Integer> INDEX_RLI_LOOKUP_MINIBATCH_SIZE = 
ConfigOptions
       .key("index.rli.lookup.minibatch.size")
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
index 398ca329dbb4..59f5847df4e5 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
@@ -198,15 +198,23 @@ public class OptionsResolver {
   }
 
   /**
-   * Returns whether {@link 
org.apache.hudi.sink.partitioner.MinibatchBucketAssignFunction} should be used 
for bucket assigning.
+   * Returns whether partitioned record level index is used for bucket 
assigning.
    */
   public static boolean isRecordLevelIndex(Configuration conf) {
+    HoodieIndex.IndexType indexType = OptionsResolver.getIndexType(conf);
+    return indexType == HoodieIndex.IndexType.RECORD_LEVEL_INDEX;
+  }
+
+  /**
+   * Returns whether the table uses metadata-table record level index.
+   */
+  public static boolean isGlobalRecordLevelIndex(Configuration conf) {
     HoodieIndex.IndexType indexType = OptionsResolver.getIndexType(conf);
     return indexType == HoodieIndex.IndexType.GLOBAL_RECORD_LEVEL_INDEX;
   }
 
   public static boolean isRLIWithBootstrap(Configuration conf) {
-    return isRecordLevelIndex(conf) && 
conf.get(FlinkOptions.INDEX_BOOTSTRAP_ENABLED);
+    return isGlobalRecordLevelIndex(conf) && 
conf.get(FlinkOptions.INDEX_BOOTSTRAP_ENABLED);
   }
 
   /**
@@ -459,7 +467,8 @@ public class OptionsResolver {
    */
   public static boolean isStreamingIndexWriteEnabled(Configuration conf) {
     return conf.get(FlinkOptions.METADATA_ENABLED)
-        && OptionsResolver.getIndexType(conf) == 
HoodieIndex.IndexType.GLOBAL_RECORD_LEVEL_INDEX
+        && (OptionsResolver.getIndexType(conf) == 
HoodieIndex.IndexType.GLOBAL_RECORD_LEVEL_INDEX
+        || OptionsResolver.getIndexType(conf) == 
HoodieIndex.IndexType.RECORD_LEVEL_INDEX)
         && 
WriteOperationType.streamingWritesToMetadataSupported(WriteOperationType.fromValue(conf.get(FlinkOptions.OPERATION)));
   }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
index 6ec7c389751f..c74bf0d0dd2b 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
@@ -31,7 +31,7 @@ import org.apache.hudi.configuration.HadoopConfigurations;
 import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.sink.event.Correspondent;
-import org.apache.hudi.sink.partitioner.index.IndexBackend;
+import org.apache.hudi.sink.partitioner.index.GlobalIndexBackend;
 import org.apache.hudi.sink.partitioner.index.IndexBackendFactory;
 import org.apache.hudi.table.action.commit.BucketInfo;
 import org.apache.hudi.util.FlinkTaskContextSupplier;
@@ -84,7 +84,7 @@ public class BucketAssignFunction
    * </ul>
    */
   @Getter
-  private transient IndexBackend indexBackend;
+  private transient GlobalIndexBackend indexBackend;
 
   /**
    * Bucket assigner to assign new bucket IDs or reuse existing ones.
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java
index 819db7439816..fe848d7bcc20 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java
@@ -233,6 +233,8 @@ public class BucketAssigner implements AutoCloseable {
 
   public void close() {
     reset();
+    smallFileAssignMap.clear();
+    newFileAssignStates.clear();
     WriteProfiles.clean(config.getBasePath());
   }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/DynamicBucketAssignFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/DynamicBucketAssignFunction.java
new file mode 100644
index 000000000000..953d837af1f2
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/DynamicBucketAssignFunction.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.partitioner;
+
+import org.apache.hudi.adapter.KeyedProcessFunctionAdapter;
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.client.model.HoodieFlinkInternalRow;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.HadoopConfigurations;
+import org.apache.hudi.configuration.OptionsResolver;
+import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.sink.event.Correspondent;
+import org.apache.hudi.sink.partitioner.index.DummyPartitionedIndexBackend;
+import org.apache.hudi.sink.partitioner.index.RecordLevelIndexBackend;
+import org.apache.hudi.sink.partitioner.index.PartitionedIndexBackend;
+import org.apache.hudi.sink.partitioner.profile.WriteProfile;
+import org.apache.hudi.sink.partitioner.profile.WriteProfiles;
+import org.apache.hudi.table.action.commit.BucketInfo;
+import org.apache.hudi.table.action.commit.BucketType;
+import org.apache.hudi.util.FlinkTaskContextSupplier;
+import org.apache.hudi.util.FlinkWriteClients;
+import org.apache.hudi.utils.RuntimeContextUtils;
+
+import lombok.Setter;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.util.Collector;
+
+/**
+ * Assigns Flink streaming records to dynamic bucket file groups.
+ *
+ * <p>This function first checks the partition-scoped RLI backend for an 
existing
+ * {@code recordKey -> fileGroupId} mapping. Existing keys are routed as 
updates to
+ * the recorded file group; new keys are assigned by {@link BucketAssigner} 
and then
+ * written back to the backend so the streaming metadata writer can persist 
the assignment to RLI.
+ */
+public class DynamicBucketAssignFunction
+    extends KeyedProcessFunctionAdapter<String, HoodieFlinkInternalRow, 
HoodieFlinkInternalRow>
+    implements CheckpointedFunction, CheckpointListener {
+
+  private final Configuration conf;
+  private final boolean isInsertOverwrite;
+
+  private transient PartitionedIndexBackend indexBackend;
+  private transient BucketAssigner bucketAssigner;
+
+  @Setter
+  protected transient Correspondent correspondent;
+
+  private transient int maxParallelism;
+  private transient int numTasks;
+  private transient int taskId;
+
+  /**
+   * Creates the dynamic bucket assign function for one bucket assign operator.
+   *
+   * @param conf Flink write configuration
+   */
+  public DynamicBucketAssignFunction(Configuration conf) {
+    this.conf = conf;
+    this.isInsertOverwrite = OptionsResolver.isInsertOverwrite(conf);
+  }
+
+  @Override
+  public void open(Configuration parameters) throws Exception {
+    super.open(parameters);
+
+    HoodieWriteConfig writeConfig = 
FlinkWriteClients.getHoodieClientConfig(this.conf, 
!OptionsResolver.isIncrementalJobGraph(conf));
+    HoodieFlinkEngineContext context = new HoodieFlinkEngineContext(
+        
HadoopFSUtils.getStorageConfWithCopy(HadoopConfigurations.getHadoopConf(this.conf)),
+        new FlinkTaskContextSupplier(getRuntimeContext()));
+    boolean delta = 
HoodieTableType.valueOf(conf.get(FlinkOptions.TABLE_TYPE)).equals(HoodieTableType.MERGE_ON_READ);
+    WriteProfile writeProfile = WriteProfiles.singleton(isInsertOverwrite, 
delta, writeConfig, context);
+    this.bucketAssigner = new BucketAssigner(
+        RuntimeContextUtils.getIndexOfThisSubtask(getRuntimeContext()),
+        
RuntimeContextUtils.getMaxNumberOfParallelSubtasks(getRuntimeContext()),
+        RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext()),
+        writeProfile,
+        writeConfig);
+    this.maxParallelism = 
RuntimeContextUtils.getMaxNumberOfParallelSubtasks(getRuntimeContext());
+    this.numTasks = 
RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext());
+    this.taskId = 
RuntimeContextUtils.getIndexOfThisSubtask(getRuntimeContext());
+  }
+
+  @Override
+  public void initializeState(FunctionInitializationContext context) {
+    this.indexBackend = isInsertOverwrite
+        ? new DummyPartitionedIndexBackend()
+        : new RecordLevelIndexBackend(conf, (partitionPath, recordKey, fileId) 
-> isRecordKeyOfThisTask(recordKey));
+  }
+
+  private boolean isRecordKeyOfThisTask(String recordKey) {
+    return KeyGroupRangeAssignment.assignKeyToParallelOperator(recordKey, 
maxParallelism, numTasks) == taskId;
+  }
+
+  @Override
+  public void processElement(HoodieFlinkInternalRow record, Context ctx, 
Collector<HoodieFlinkInternalRow> out) throws Exception {
+    String partitionPath = record.getPartitionPath();
+    String recordKey = record.getRecordKey();
+    String fileGroupId = indexBackend.get(partitionPath, recordKey);
+
+    BucketInfo bucketInfo;
+    // `operationType` in the record is used to generate index record in the 
following writer operator.
+    // Currently, we only emit INSERT index record and ignore DELETE index 
record, because we cannot be
+    // certain whether data with the same key in storage will actually be 
deleted. It's possible that
+    // data in storage is deleted, but the record level index data remains.
+    // todo: support ordering value in record level index metadata payload, 
since the efficiency of location
+    // tagging by merging lookup is intolerable in flink streaming writing 
scenario.
+    if (fileGroupId != null) {
+      bucketInfo = bucketAssigner.addUpdate(partitionPath, fileGroupId);
+    } else {
+      bucketInfo = bucketAssigner.addInsert(partitionPath);
+      indexBackend.update(partitionPath, recordKey, 
bucketInfo.getFileIdPrefix());
+      record.setOperationType("I");
+    }
+
+    String instantTime = bucketInfo.getBucketType() == BucketType.INSERT ? "I" 
: "U";
+    record.setInstantTime(instantTime);
+    record.setFileId(bucketInfo.getFileIdPrefix());
+    out.collect(record);
+  }
+
+  @Override
+  public void snapshotState(FunctionSnapshotContext context) {
+    this.bucketAssigner.reset();
+    this.indexBackend.onCheckpoint(context.getCheckpointId());
+  }
+
+  @Override
+  public void notifyCheckpointComplete(long checkpointId) {
+    this.bucketAssigner.reload(checkpointId);
+    this.indexBackend.onCheckpointComplete(this.correspondent, checkpointId);
+  }
+
+  @Override
+  public void close() throws Exception {
+    this.indexBackend.close();
+    if (this.bucketAssigner != null) {
+      this.bucketAssigner.close();
+    }
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/DynamicBucketAssignOperator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/DynamicBucketAssignOperator.java
new file mode 100644
index 000000000000..b26ef25d77a6
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/DynamicBucketAssignOperator.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.partitioner;
+
+import org.apache.hudi.client.model.HoodieFlinkInternalRow;
+import org.apache.hudi.sink.event.Correspondent;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+
+/**
+ * Operator wrapper that wires the dynamic bucket assign function to the data 
write coordinator.
+ */
+public class DynamicBucketAssignOperator extends KeyedProcessOperator<String, 
HoodieFlinkInternalRow, HoodieFlinkInternalRow> {
+
+  /**
+   * The dynamic bucket assign function.
+   */
+  private final DynamicBucketAssignFunction bucketAssignFunction;
+
+  /**
+   * OperatorId for the data write operator.
+   */
+  private final OperatorID dataWriteOperatorId;
+
+  /**
+   * Creates an operator wrapper for dynamic bucket assignment.
+   *
+   * @param bucketAssignFunction function that performs partitioned-RLI-backed 
assignment
+   * @param dataWriteOperatorId operator id of the downstream data write 
operator coordinator
+   */
+  public DynamicBucketAssignOperator(DynamicBucketAssignFunction 
bucketAssignFunction, OperatorID dataWriteOperatorId) {
+    super(bucketAssignFunction);
+    this.bucketAssignFunction = bucketAssignFunction;
+    this.dataWriteOperatorId = dataWriteOperatorId;
+  }
+
+  @Override
+  public void setup(StreamTask<?, ?> containingTask, StreamConfig config, 
Output<StreamRecord<HoodieFlinkInternalRow>> output) {
+    super.setup(containingTask, config, output);
+    
this.bucketAssignFunction.setCorrespondent(Correspondent.getInstance(dataWriteOperatorId,
+        
getContainingTask().getEnvironment().getOperatorCoordinatorEventGateway()));
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/RecordIndexPartitioner.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/GlobalRecordIndexPartitioner.java
similarity index 65%
copy from 
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/RecordIndexPartitioner.java
copy to 
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/GlobalRecordIndexPartitioner.java
index e80b7bba25be..8b6f1ae22ad9 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/RecordIndexPartitioner.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/GlobalRecordIndexPartitioner.java
@@ -22,6 +22,7 @@ import org.apache.hudi.client.common.HoodieFlinkEngineContext;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.metadata.HoodieTableMetadataUtil;
 import org.apache.hudi.metadata.MetadataPartitionType;
@@ -31,12 +32,13 @@ import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.configuration.Configuration;
 
 /**
- * Record index input partitioner, which is aligned with the mapping of record 
key
- * to the file group of record index partition in metadata table. It prevents 
multiple
- * index write subtasks from writing the same record index file group, thereby 
effectively
- * reducing the number of small files.
+ * Global record index input partitioner.
+ *
+ * <p>The partitioner is aligned with the record-key to metadata-table 
file-group mapping used by
+ * global RLI. This prevents multiple index write subtasks from writing the 
same record-index file
+ * group and reduces small files in the metadata table.
  */
-public class RecordIndexPartitioner implements Partitioner<HoodieKey> {
+public class GlobalRecordIndexPartitioner implements Partitioner<HoodieKey> {
   private final Configuration conf;
   /**
    * The number of file groups for record index partition in metadata data 
table. The number
@@ -45,19 +47,30 @@ public class RecordIndexPartitioner implements 
Partitioner<HoodieKey> {
    */
   private int numFileGroupsForRecordIndexPartition = -1;
 
-  public RecordIndexPartitioner(Configuration conf) {
+  /**
+   * Creates a partitioner for global RLI index writes.
+   *
+   * @param conf Flink write configuration
+   */
+  public GlobalRecordIndexPartitioner(Configuration conf) {
     this.conf = conf;
   }
 
+  /**
+   * Routes an index row to the writer responsible for its global RLI file 
group.
+   *
+   * @param recordKey index row key, where the partition path is ignored for 
global RLI
+   * @param numPartitions downstream index writer parallelism
+   * @return downstream subtask index
+   */
   @Override
   public int partition(HoodieKey recordKey, int numPartitions) {
     // initialize numFileGroupsForRecordIndexPartition lazily.
     if (numFileGroupsForRecordIndexPartition < 0) {
       numFileGroupsForRecordIndexPartition = 
getNumFileGroupsForRecordIndexPartition();
     }
-    // note: the hashing is in line with GLOBAL_RECORD_LEVEL_INDEX currently.
-    // if partitioned record level index is supported, the partition path 
should be considered here as well.
-    int fgIndex = 
HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(recordKey.getRecordKey(), 
numFileGroupsForRecordIndexPartition);
+    int fgIndex = HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(
+        recordKey.getRecordKey(), numFileGroupsForRecordIndexPartition);
     return fgIndex % numPartitions;
   }
 
@@ -66,11 +79,14 @@ public class RecordIndexPartitioner implements 
Partitioner<HoodieKey> {
    */
   private int getNumFileGroupsForRecordIndexPartition() {
     HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
-    HoodieTableMetadata metadataTable = 
metaClient.getTableFormat().getMetadataFactory().create(
+    try (HoodieTableMetadata metadataTable = 
metaClient.getTableFormat().getMetadataFactory().create(
         HoodieFlinkEngineContext.DEFAULT,
         metaClient.getStorage(),
         StreamerUtil.metadataConfig(conf),
-        conf.get(FlinkOptions.PATH));
-    return 
metadataTable.getNumFileGroupsForPartition(MetadataPartitionType.RECORD_INDEX);
+        conf.get(FlinkOptions.PATH))) {
+      return 
metadataTable.getNumFileGroupsForPartition(MetadataPartitionType.RECORD_INDEX);
+    } catch (Exception e) {
+      throw new HoodieException("Failed to get file group count for global 
record index partition.", e);
+    }
   }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/RecordIndexPartitioner.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/RecordIndexPartitioner.java
index e80b7bba25be..a4597e00f099 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/RecordIndexPartitioner.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/RecordIndexPartitioner.java
@@ -19,9 +19,13 @@
 package org.apache.hudi.sink.partitioner;
 
 import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Functions;
+import org.apache.hudi.common.util.hash.BucketIndexUtil;
 import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.metadata.HoodieTableMetadataUtil;
 import org.apache.hudi.metadata.MetadataPartitionType;
@@ -30,47 +34,96 @@ import org.apache.hudi.util.StreamerUtil;
 import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.configuration.Configuration;
 
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
 /**
- * Record index input partitioner, which is aligned with the mapping of record 
key
+ * Partitioned record index input partitioner, which is aligned with the 
mapping of record key
  * to the file group of record index partition in metadata table. It prevents 
multiple
  * index write subtasks from writing the same record index file group, thereby 
effectively
  * reducing the number of small files.
+ *
+ * <p>Partitioned RLI stores record-index file groups under the corresponding 
data table
+ * partition. Since RLI file groups for different data partitions can have the 
same bucket id,
+ * shuffling only by bucket id only would route those equally numbered buckets 
to the same
+ * downstream task and can easily cause data skew. Therefore an index row must 
be routed by
+ * its data partition first, then by the record-key hash within that 
partition's RLI file
+ * group range.
  */
 public class RecordIndexPartitioner implements Partitioner<HoodieKey> {
   private final Configuration conf;
+  private Map<String, Integer> partitionedRLIFileGroupCounts;
+  private Functions.Function3<Integer, String, Integer, Integer> 
partitionIndexFunc;
+
   /**
-   * The number of file groups for record index partition in metadata data 
table. The number
-   * cannot be calculated during compiling the writing pipeline, since the 
hoodie table may
-   * not be created yet, so the number is lazily calculated during job running.
+   * Creates a partitioner for partitioned RLI index writes.
+   *
+   * @param conf Flink write configuration
    */
-  private int numFileGroupsForRecordIndexPartition = -1;
-
   public RecordIndexPartitioner(Configuration conf) {
     this.conf = conf;
   }
 
+  /**
+   * Routes an index row by data partition and record-key-derived partitioned 
RLI file group.
+   *
+   * @param recordKey index row key containing both record key and data 
partition path
+   * @param numPartitions downstream index writer parallelism
+   * @return downstream subtask index
+   */
   @Override
   public int partition(HoodieKey recordKey, int numPartitions) {
-    // initialize numFileGroupsForRecordIndexPartition lazily.
-    if (numFileGroupsForRecordIndexPartition < 0) {
-      numFileGroupsForRecordIndexPartition = 
getNumFileGroupsForRecordIndexPartition();
+    if (partitionedRLIFileGroupCounts == null) {
+      partitionedRLIFileGroupCounts = getPartitionedRLIFileGroupCounts();
+    }
+    int fileGroupCount = 
getFileGroupCountForPartitionedRLI(recordKey.getPartitionPath());
+    int fgIndex = 
HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(recordKey.getRecordKey(), 
fileGroupCount);
+    if (partitionIndexFunc == null) {
+      partitionIndexFunc = 
BucketIndexUtil.getPartitionIndexFunc(numPartitions);
     }
-    // note: the hashing is in line with GLOBAL_RECORD_LEVEL_INDEX currently.
-    // if partitioned record level index is supported, the partition path 
should be considered here as well.
-    int fgIndex = 
HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(recordKey.getRecordKey(), 
numFileGroupsForRecordIndexPartition);
-    return fgIndex % numPartitions;
+    return partitionIndexFunc.apply(fileGroupCount, 
recordKey.getPartitionPath(), fgIndex);
   }
 
   /**
-   * Get the number of file groups for record index partition in metadata 
table.
+   * Get the file group count for each data partition in the partitioned 
record index.
    */
-  private int getNumFileGroupsForRecordIndexPartition() {
+  private Map<String, Integer> getPartitionedRLIFileGroupCounts() {
     HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
-    HoodieTableMetadata metadataTable = 
metaClient.getTableFormat().getMetadataFactory().create(
+    if 
(!metaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.RECORD_INDEX))
 {
+      return Collections.emptyMap();
+    }
+    try (HoodieTableMetadata metadataTable = 
metaClient.getTableFormat().getMetadataFactory().create(
         HoodieFlinkEngineContext.DEFAULT,
         metaClient.getStorage(),
         StreamerUtil.metadataConfig(conf),
-        conf.get(FlinkOptions.PATH));
-    return 
metadataTable.getNumFileGroupsForPartition(MetadataPartitionType.RECORD_INDEX);
+        conf.get(FlinkOptions.PATH))) {
+      Map<String, Integer> fileGroupCounts = new HashMap<>();
+      
metadataTable.getBucketizedFileGroupsForPartitionedRLI(MetadataPartitionType.RECORD_INDEX)
+          .forEach((partitionPath, fileSlices) -> 
fileGroupCounts.put(partitionPath, fileSlices.size()));
+      return fileGroupCounts;
+    } catch (Exception e) {
+      throw new HoodieException("Failed to get file group counts for 
partitioned record index.", e);
+    }
+  }
+
+  /**
+   * Get the partitioned record index file group count for the given data 
partition.
+   */
+  private int getFileGroupCountForPartitionedRLI(String partitionPath) {
+    int fileGroupCount = 
partitionedRLIFileGroupCounts.getOrDefault(partitionPath, 0);
+    // HoodieBackedTableMetadataWriter initializes record-index file groups 
for a newly seen
+    // data partition with RECORD_LEVEL_INDEX_MIN_FILE_GROUP_COUNT_PROP, so 
the writer-side
+    // partitioner should use the same count before that partition appears in 
the MDT view.
+    return fileGroupCount > 0 ? fileGroupCount : 
getMinFileGroupCountForPartitionedRLI();
+  }
+
+  /**
+   * Get the minimum file group count used to initialize newly seen 
partitioned record index partitions.
+   */
+  private int getMinFileGroupCountForPartitionedRLI() {
+    return Integer.parseInt(conf.getString(
+        
HoodieMetadataConfig.RECORD_LEVEL_INDEX_MIN_FILE_GROUP_COUNT_PROP.key(),
+        
HoodieMetadataConfig.RECORD_LEVEL_INDEX_MIN_FILE_GROUP_COUNT_PROP.defaultValue().toString()));
   }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/DummyPartitionedIndexBackend.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/DummyPartitionedIndexBackend.java
new file mode 100644
index 000000000000..2fe8c4784294
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/DummyPartitionedIndexBackend.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.sink.partitioner.index;
+
+import java.io.IOException;
+
+/**
+ * A no-op implementation of {@link PartitionedIndexBackend} that does not 
persist any index state.
+ *
+ * <p>Used as a placeholder when partitioned record-level indexing is disabled 
or not applicable.
+ * All lookups return {@code null} (record not found), and writes are silently 
discarded.
+ */
+public class DummyPartitionedIndexBackend implements PartitionedIndexBackend {
+
+  @Override
+  public String get(String partitionPath, String recordKey) {
+    return null;
+  }
+
+  @Override
+  public void update(String partitionPath, String recordKey, String fileId) {
+    // do nothing
+  }
+
+  @Override
+  public void close() throws IOException {
+    // do nothing
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/FlinkStateIndexBackend.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/FlinkStateIndexBackend.java
index c952f8f047d1..b4ea70949232 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/FlinkStateIndexBackend.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/FlinkStateIndexBackend.java
@@ -25,9 +25,9 @@ import org.apache.flink.api.common.state.ValueState;
 import java.io.IOException;
 
 /**
- * An implementation of {@link IndexBackend} based on flink keyed value state.
+ * An implementation of {@link GlobalIndexBackend} based on flink keyed value 
state.
  */
-public class FlinkStateIndexBackend implements IndexBackend {
+public class FlinkStateIndexBackend implements GlobalIndexBackend {
 
   private final ValueState<HoodieRecordGlobalLocation> indexState;
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexBackend.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/GlobalIndexBackend.java
similarity index 56%
copy from 
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexBackend.java
copy to 
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/GlobalIndexBackend.java
index 636ca4e6b0d3..12fab0631b03 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexBackend.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/GlobalIndexBackend.java
@@ -19,18 +19,13 @@
 package org.apache.hudi.sink.partitioner.index;
 
 import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
-import org.apache.hudi.sink.event.Correspondent;
 
-import org.apache.flink.metrics.MetricGroup;
-
-import java.io.Closeable;
 import java.io.IOException;
 
 /**
- * An interface that provides an abstraction for managing record location 
mappings in the index system.
- * It serves as the backend for storing and retrieving the location of records 
identified by their unique keys.
+ * Index backend for managing global record location mappings keyed by record 
key.
  */
-public interface IndexBackend extends Closeable {
+public interface GlobalIndexBackend extends IndexBackend {
 
   /**
    * Retrieves the global location of a record based on its key.
@@ -47,32 +42,4 @@ public interface IndexBackend extends Closeable {
    * @param recordGlobalLocation the new global location of the record
    */
   void update(String recordKey, HoodieRecordGlobalLocation 
recordGlobalLocation) throws IOException;
-
-  /**
-   * Listener method called when the bucket assign operator finishes the 
checkpoint with {@code checkpointId}.
-   *
-   * @param checkpointId checkpoint id.
-   */
-  default void onCheckpoint(long checkpointId) {
-    // do nothing.
-  }
-
-  /**
-   * Listener method called when the bucket assign operator receives a notify 
checkpoint complete event.
-   *
-   * @param correspondent         The Correspondent used to get inflight 
instants from the coordinator.
-   * @param completedCheckpointId The latest completed checkpoint id
-   */
-  default void onCheckpointComplete(Correspondent correspondent, long 
completedCheckpointId) {
-    // do nothing.
-  }
-
-  /**
-   * Registers metrics for this backend.
-   *
-   * @param metricGroup flink metric group
-   */
-  default void registerMetrics(MetricGroup metricGroup) {
-    // do nothing.
-  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RecordLevelIndexBackend.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/GlobalRecordLevelIndexBackend.java
similarity index 80%
copy from 
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RecordLevelIndexBackend.java
copy to 
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/GlobalRecordLevelIndexBackend.java
index 6fdecb0f22c4..cef722c87a4b 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RecordLevelIndexBackend.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/GlobalRecordLevelIndexBackend.java
@@ -44,10 +44,14 @@ import java.util.List;
 import java.util.Map;
 
 /**
- * An implementation of {@link IndexBackend} based on the record level index 
in metadata table.
+ * Global record-level-index backend backed by the metadata table.
+ *
+ * <p>The backend serves the global RLI path: lookups are keyed by record key 
only and return
+ * {@link HoodieRecordGlobalLocation}. A checkpoint-aware {@link 
RecordIndexCache} keeps recently
+ * accessed locations locally and falls back to metadata table reads for cache 
misses.
  */
 @Slf4j
-public class RecordLevelIndexBackend implements MinibatchIndexBackend {
+public class GlobalRecordLevelIndexBackend implements MinibatchIndexBackend {
   @VisibleForTesting
   @Getter
   private final RecordIndexCache recordIndexCache;
@@ -55,7 +59,13 @@ public class RecordLevelIndexBackend implements 
MinibatchIndexBackend {
   private final HoodieTableMetaClient metaClient;
   private HoodieTableMetadata metadataTable;
 
-  public RecordLevelIndexBackend(Configuration conf, long initCheckpointId) {
+  /**
+   * Creates a global RLI backend with a checkpoint-aware cache.
+   *
+   * @param conf Flink write configuration
+   * @param initCheckpointId restored checkpoint id used to seed cache 
eviction state, or {@code -1}
+   */
+  public GlobalRecordLevelIndexBackend(Configuration conf, long 
initCheckpointId) {
     this.metaClient = StreamerUtil.createMetaClient(conf);
     this.conf = conf;
     this.recordIndexCache = new RecordIndexCache(conf, initCheckpointId);
@@ -102,11 +112,25 @@ public class RecordLevelIndexBackend implements 
MinibatchIndexBackend {
     recordKeysAndLocations.forEach(keyAndLocation -> 
recordIndexCache.update(keyAndLocation.getKey(), keyAndLocation.getValue()));
   }
 
+  /**
+   * Starts a new checkpoint generation in the local record-index cache.
+   *
+   * @param checkpointId checkpoint id from the bucket assign operator
+   */
   @Override
   public void onCheckpoint(long checkpointId) {
     recordIndexCache.addCheckpointCache(checkpointId);
   }
 
+  /**
+   * Marks older cache generations evictable after a checkpoint completes.
+   *
+   * <p>The minimum inflight checkpoint is used as the retention boundary. If 
no instant is inflight,
+   * the completed checkpoint id is safe because the writer coordinator has 
already advanced past it.
+   *
+   * @param correspondent writer coordinator correspondent used to query 
inflight instants
+   * @param completedCheckpointID completed checkpoint id reported by Flink
+   */
   @Override
   public void onCheckpointComplete(Correspondent correspondent, long 
completedCheckpointID) {
     Map<Long, String> inflightInstants = 
correspondent.requestInflightInstants();
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexBackend.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexBackend.java
index 636ca4e6b0d3..d0ec6eb6a7c4 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexBackend.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexBackend.java
@@ -18,36 +18,17 @@
 
 package org.apache.hudi.sink.partitioner.index;
 
-import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
 import org.apache.hudi.sink.event.Correspondent;
 
 import org.apache.flink.metrics.MetricGroup;
 
 import java.io.Closeable;
-import java.io.IOException;
 
 /**
- * An interface that provides an abstraction for managing record location 
mappings in the index system.
- * It serves as the backend for storing and retrieving the location of records 
identified by their unique keys.
+ * An interface that provides common lifecycle hooks for index backends.
  */
 public interface IndexBackend extends Closeable {
 
-  /**
-   * Retrieves the global location of a record based on its key.
-   *
-   * @param recordKey the unique key identifying the record
-   * @return the global location of the record, or null if the record is not 
found in the index
-   */
-  HoodieRecordGlobalLocation get(String recordKey) throws IOException;
-
-  /**
-   * Updates the global location of a record in the index.
-   *
-   * @param recordKey the unique key identifying the record
-   * @param recordGlobalLocation the new global location of the record
-   */
-  void update(String recordKey, HoodieRecordGlobalLocation 
recordGlobalLocation) throws IOException;
-
   /**
    * Listener method called when the bucket assign operator finishes the 
checkpoint with {@code checkpointId}.
    *
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexBackendFactory.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexBackendFactory.java
index 4e9df8d9fcbd..01a2e04224a5 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexBackendFactory.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexBackendFactory.java
@@ -39,10 +39,21 @@ import 
org.apache.flink.runtime.state.FunctionInitializationContext;
 import java.util.stream.StreamSupport;
 
 /**
- * Factory to create an {@link IndexBackend} based on the configured index 
type.
+ * Factory to create a {@link GlobalIndexBackend} based on the configured 
index type.
  */
 public class IndexBackendFactory {
-  public static IndexBackend create(Configuration conf, 
FunctionInitializationContext context, RuntimeContext runtimeContext) throws 
Exception {
+  /**
+   * Creates the global index backend used by the legacy bucket assign 
function.
+   *
+   * <p>Flink state index stores locations in keyed state. Global RLI either 
uses the bootstrap
+   * RocksDB cache or a metadata-table-backed backend with checkpoint-aware 
cache eviction.
+   *
+   * @param conf Flink write configuration
+   * @param context Flink function initialization context
+   * @param runtimeContext Flink runtime context for job and attempt metadata
+   * @return global index backend for record-key lookups
+   */
+  public static GlobalIndexBackend create(Configuration conf, 
FunctionInitializationContext context, RuntimeContext runtimeContext) throws 
Exception {
     HoodieIndex.IndexType indexType = OptionsResolver.getIndexType(conf);
     switch (indexType) {
       case FLINK_STATE:
@@ -74,7 +85,7 @@ public class IndexBackendFactory {
           // set the jobId state with current job id.
           jobIdState.clear();
           jobIdState.add(RuntimeContextUtils.getJobId(runtimeContext));
-          return new RecordLevelIndexBackend(conf, initCheckpointId);
+          return new GlobalRecordLevelIndexBackend(conf, initCheckpointId);
         }
       default:
         throw new UnsupportedOperationException("Index type " + indexType + " 
is not supported for bucket assigning yet.");
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexWriteFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexWriteFunction.java
index ad13e54bd86e..8e64d58a97b3 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexWriteFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexWriteFunction.java
@@ -51,6 +51,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Function;
 
 /**
  * A Flink stream writing function that handles writing index records to the 
metadata table.
@@ -83,6 +84,8 @@ public class IndexWriteFunction extends 
AbstractStreamWriteFunction<RowData> {
    */
   private transient HoodieFlinkTable<?> flinkTable;
 
+  private transient Function<RowData, String> dedupKeyExtractor;
+
   public IndexWriteFunction(Configuration conf) {
     super(conf, true);
   }
@@ -95,6 +98,8 @@ public class IndexWriteFunction extends 
AbstractStreamWriteFunction<RowData> {
         config,
         config.get(FlinkOptions.INDEX_RLI_WRITE_BUFFER_SIZE) * 1024 * 1024);
     this.indexDataBuffer = 
BufferUtils.createBuffer(IndexRowUtils.INDEX_ROW_TYPE, memorySegmentPool);
+    this.dedupKeyExtractor = 
this.writeClient.getConfig().isRecordLevelIndexEnabled()
+        ? IndexWriteFunction::getPartitionedDedupKey : 
IndexRowUtils::getRecordKey;
   }
 
   @Override
@@ -171,13 +176,19 @@ public class IndexWriteFunction extends 
AbstractStreamWriteFunction<RowData> {
     Map<String, HoodieRecord> keyAndRecordMap = new LinkedHashMap<>();
     while (rowItr.hasNext()) {
       RowData indexRow = rowItr.next();
-      String recordKey = IndexRowUtils.getRecordKey(indexRow);
-      keyAndRecordMap.put(recordKey, 
IndexRowUtils.convertToHoodieRecord(this.currentInstant, indexRow, 
writeConfig));
+      keyAndRecordMap.put(
+          dedupKeyExtractor.apply(indexRow),
+          IndexRowUtils.convertToHoodieRecord(this.currentInstant, indexRow, 
writeConfig));
       dataPartitions.add(IndexRowUtils.getPartition(indexRow));
     }
     return Pair.of(new ArrayList<>(keyAndRecordMap.values()), dataPartitions);
   }
 
+  private static String getPartitionedDedupKey(RowData indexRow) {
+    String recordKey = IndexRowUtils.getRecordKey(indexRow);
+    return IndexRowUtils.getPartition(indexRow) + "/" + recordKey;
+  }
+
   @Override
   public void endInput() {
     super.endInput();
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/MinibatchIndexBackend.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/MinibatchIndexBackend.java
index 0b79ee10bcdf..cdfceb5cd49c 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/MinibatchIndexBackend.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/MinibatchIndexBackend.java
@@ -28,9 +28,23 @@ import java.util.Map;
 /**
  * Index delegator which supports mini-batch index operations.
  */
-public interface MinibatchIndexBackend extends IndexBackend {
+public interface MinibatchIndexBackend extends GlobalIndexBackend {
 
-  Map<String, HoodieRecordGlobalLocation> get(List<String> recordKey) throws 
IOException;
+  /**
+   * Retrieves locations for a batch of record keys.
+   *
+   * <p>The returned map should contain one entry for each requested key and 
preserve the request
+   * order when the implementation can do so. Missing keys should map to 
{@code null}.
+   *
+   * @param recordKeys record keys to look up
+   * @return record-key to global-location mapping
+   */
+  Map<String, HoodieRecordGlobalLocation> get(List<String> recordKeys) throws 
IOException;
 
+  /**
+   * Updates locations for a batch of record keys.
+   *
+   * @param recordKeysAndLocations record-key and location pairs to write into 
the backend
+   */
   void update(List<Pair<String, HoodieRecordGlobalLocation>> 
recordKeysAndLocations) throws IOException;
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/PartitionedIndexBackend.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/PartitionedIndexBackend.java
new file mode 100644
index 000000000000..3a2e583f41d9
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/PartitionedIndexBackend.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.partitioner.index;
+
+/**
+ * Index backend for partition-scoped record-to-file-group mappings.
+ *
+ * <p>Unlike {@link GlobalIndexBackend}, this backend is addressed by both 
data partition and record
+ * key. It is used by the partitioned RLI based dynamic bucket assignment 
path, where the persisted
+ * index value is the file group id selected by {@code BucketAssigner}.
+ */
+public interface PartitionedIndexBackend extends IndexBackend {
+
+  /**
+   * Retrieves the file group id for a record in the given partition.
+   *
+   * @param partitionPath the partition path of the record
+   * @param recordKey the unique key identifying the record
+   * @return the file group id, or null if the record is not found in the index
+   */
+  String get(String partitionPath, String recordKey);
+
+  /**
+   * Updates the file group id for a record in the given partition.
+   *
+   * @param partitionPath the partition path of the record
+   * @param recordKey the unique key identifying the record
+   * @param fileId the file group id, usually the file id prefix produced by 
bucket assignment
+   */
+  void update(String partitionPath, String recordKey, String fileId);
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RecordLevelIndexBackend.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RecordLevelIndexBackend.java
index 6fdecb0f22c4..cc44a3e154f5 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RecordLevelIndexBackend.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RecordLevelIndexBackend.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,108 +19,201 @@
 package org.apache.hudi.sink.partitioner.index;
 
 import org.apache.hudi.client.common.HoodieFlinkEngineContext;
-import org.apache.hudi.common.data.HoodieListData;
+import org.apache.hudi.common.config.HoodieCommonConfig;
 import org.apache.hudi.common.data.HoodiePairData;
+import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
+import org.apache.hudi.common.serialization.DefaultSerializer;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.util.HoodieDataUtils;
+import org.apache.hudi.common.util.DefaultSizeEstimator;
+import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.VisibleForTesting;
-import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
+import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.MetadataPartitionType;
 import org.apache.hudi.sink.event.Correspondent;
+import org.apache.hudi.sink.utils.SamplingActionExecutor;
+import org.apache.hudi.util.FlinkWriteClients;
 import org.apache.hudi.util.StreamerUtil;
 
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.flink.configuration.Configuration;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
- * An implementation of {@link IndexBackend} based on the record level index 
in metadata table.
+ * Partition-scoped index backend backed by partitioned record level index.
+ *
+ * <p>The backend keeps a lazy cache per data partition. Each cache maps 
record keys owned by the
+ * current Flink assign subtask to the file group id stored in the partitioned 
RLI. New records are
+ * added to the cache after bucket assignment, while existing records keep 
their original RLI routing.
+ *
+ * <p>Partition caches are evicted lazily: checkpoint completion only advances 
the safe eviction
+ * watermark, and cleanup happens when the total in-memory cache size exceeds
+ * {@link FlinkOptions#INDEX_RLI_CACHE_SIZE}.
  */
 @Slf4j
-public class RecordLevelIndexBackend implements MinibatchIndexBackend {
-  @VisibleForTesting
-  @Getter
-  private final RecordIndexCache recordIndexCache;
+public class RecordLevelIndexBackend implements PartitionedIndexBackend {
+
   private final Configuration conf;
+  private final HoodieWriteConfig writeConfig;
   private final HoodieTableMetaClient metaClient;
+  private final long maxCacheSizeInBytes;
+  private final BootstrapFilter bootstrapFilter;
   private HoodieTableMetadata metadataTable;
 
-  public RecordLevelIndexBackend(Configuration conf, long initCheckpointId) {
-    this.metaClient = StreamerUtil.createMetaClient(conf);
+  @Getter
+  private final Map<String, BucketCache> partitionBucketCaches = new 
LinkedHashMap<>(16, 0.75f, true);
+  private long currentCheckpointId = -1L;
+  private long minRetainedCheckpointId = Long.MIN_VALUE;
+  private final SamplingActionExecutor cleanExecutor = new 
SamplingActionExecutor();
+
+  /**
+   * Creates a partitioned RLI backend with custom ownership filtering.
+   *
+   * <p>This constructor is used by simple bucket index, whose write ownership 
is determined by
+   * partition and bucket file id rather than Flink record-key key groups.
+   *
+   * @param conf Flink write configuration
+   * @param bootstrapFilter filter for deciding whether a bootstrapped RLI 
record belongs to this task
+   */
+  public RecordLevelIndexBackend(Configuration conf, BootstrapFilter 
bootstrapFilter) {
     this.conf = conf;
-    this.recordIndexCache = new RecordIndexCache(conf, initCheckpointId);
+    this.writeConfig = FlinkWriteClients.getHoodieClientConfig(conf, false, 
false);
+    this.metaClient = StreamerUtil.createMetaClient(conf);
+    this.maxCacheSizeInBytes = conf.get(FlinkOptions.INDEX_RLI_CACHE_SIZE) * 
1024 * 1024;
+    this.bootstrapFilter = bootstrapFilter;
     reloadMetadataTable();
   }
 
   @Override
-  public HoodieRecordGlobalLocation get(String recordKey) throws IOException {
-    return get(Collections.singletonList(recordKey)).get(recordKey);
+  public String get(String partitionPath, String recordKey) {
+    BucketCache cache = getOrBootstrapPartition(partitionPath);
+    return cache.getFileGroupId(recordKey);
   }
 
   @Override
-  public void update(String recordKey, HoodieRecordGlobalLocation 
recordGlobalLocation) {
-    recordIndexCache.update(recordKey, recordGlobalLocation);
+  public void update(String partitionPath, String recordKey, String fileId) {
+    BucketCache cache = getOrBootstrapPartition(partitionPath);
+    cache.putRecordKey(recordKey, fileId);
+    cleanExecutor.runIfNecessary(() -> cleanIfNecessary(0L, partitionPath));
   }
 
+  /**
+   * Records the latest checkpoint id seen by this assign subtask.
+   *
+   * <p>New record-key mappings written after this point are tagged with the 
checkpoint id so lazy
+   * eviction can avoid removing partition caches that may still be needed for 
recovery.
+   *
+   * @param checkpointId checkpoint id from the bucket assign operator
+   */
   @Override
-  public Map<String, HoodieRecordGlobalLocation> get(List<String> recordKeys) 
throws IOException {
-    // use a linked hash map to keep the natural order.
-    Map<String, HoodieRecordGlobalLocation> keysAndLocations = new 
LinkedHashMap<>();
-    List<String> missedKeys = new ArrayList<>();
-    for (String key: recordKeys) {
-      HoodieRecordGlobalLocation location = recordIndexCache.get(key);
-      if (location == null) {
-        missedKeys.add(key);
-      }
-      // insert anyway even the location is null to keep the natural order.
-      keysAndLocations.put(key, location);
-    }
-    if (!missedKeys.isEmpty()) {
-      HoodiePairData<String, HoodieRecordGlobalLocation> recordIndexData =
-          
metadataTable.readRecordIndexLocationsWithKeys(HoodieListData.eager(missedKeys));
-      List<Pair<String, HoodieRecordGlobalLocation>> recordIndexLocations = 
HoodieDataUtils.dedupeAndCollectAsList(recordIndexData);
-      recordIndexLocations.forEach(keyAndLocation -> {
-        recordIndexCache.update(keyAndLocation.getKey(), 
keyAndLocation.getValue());
-        keysAndLocations.put(keyAndLocation.getKey(), 
keyAndLocation.getValue());
-      });
-    }
-    return keysAndLocations;
+  public void onCheckpoint(long checkpointId) {
+    this.currentCheckpointId = checkpointId;
   }
 
+  /**
+   * Advances the lazy eviction watermark after a completed checkpoint.
+   *
+   * <p>The watermark is derived from the minimum inflight checkpoint so 
caches that may still be
+   * needed for recovery are retained. Metadata table state is reloaded after 
the coordinator has
+   * committed or advanced instants for the completed checkpoint.
+   *
+   * @param correspondent writer coordinator correspondent used to query 
inflight instants
+   * @param completedCheckpointId completed checkpoint id reported by Flink
+   */
   @Override
-  public void update(List<Pair<String, HoodieRecordGlobalLocation>> 
recordKeysAndLocations) throws IOException {
-    recordKeysAndLocations.forEach(keyAndLocation -> 
recordIndexCache.update(keyAndLocation.getKey(), keyAndLocation.getValue()));
+  public void onCheckpointComplete(Correspondent correspondent, long 
completedCheckpointId) {
+    Map<Long, String> inflightInstants = 
correspondent.requestInflightInstants();
+    
updateEvictableCkp(inflightInstants.keySet().stream().min(Long::compareTo).orElse(completedCheckpointId));
+    metaClient.reloadActiveTimeline();
+    reloadMetadataTable();
   }
 
-  @Override
-  public void onCheckpoint(long checkpointId) {
-    recordIndexCache.addCheckpointCache(checkpointId);
+  private BucketCache getOrBootstrapPartition(String partitionPath) {
+    BucketCache cache = partitionBucketCaches.get(partitionPath);
+    if (cache != null) {
+      return cache;
+    }
+
+    cache = bootstrapPartition(partitionPath);
+    partitionBucketCaches.put(partitionPath, cache);
+    return cache;
   }
 
-  @Override
-  public void onCheckpointComplete(Correspondent correspondent, long 
completedCheckpointID) {
-    Map<Long, String> inflightInstants = 
correspondent.requestInflightInstants();
-    log.info("Inflight instants and the corresponding checkpoints: {}, 
notified completed checkpoints: {}",
-        inflightInstants, completedCheckpointID);
-    // if there are no inflight instants,
-    // the latest completed checkpoint id is used as the minimum checkpoint id,
-    // since the streaming write operator always uses previous checkpoint id 
to request the new instant.
-    
recordIndexCache.markAsEvictable(inflightInstants.keySet().stream().min(Long::compareTo).orElse(completedCheckpointID));
-    this.metaClient.reloadActiveTimeline();
-    reloadMetadataTable();
+  private BucketCache bootstrapPartition(String partitionPath) {
+    BucketCache cache = createBucketCache(partitionPath);
+    if 
(!metaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.RECORD_INDEX))
 {
+      log.info("Record index is not available yet. Start partitioned RLI cache 
empty for partition {}", partitionPath);
+      return cache;
+    }
+
+    try {
+      Map<String, List<FileSlice>> partitionedFileGroups =
+          
metadataTable.getBucketizedFileGroupsForPartitionedRLI(MetadataPartitionType.RECORD_INDEX);
+      List<FileSlice> fileSlices = partitionedFileGroups.get(partitionPath);
+      if (fileSlices == null || fileSlices.isEmpty()) {
+        return cache;
+      }
+      HoodiePairData<String, HoodieRecordGlobalLocation> locations =
+          metadataTable.readRecordIndexLocations(fileSlicesToFilter -> 
fileSlices);
+      final AtomicLong totalCnt = new AtomicLong();
+      locations.forEach(locationPair -> {
+        String recordKey = locationPair.getLeft();
+        String fileId = locationPair.getRight().getFileId();
+        if (bootstrapFilter.shouldLoad(partitionPath, recordKey, fileId)) {
+          cache.bootstrapRecordKey(recordKey, fileId);
+        }
+        totalCnt.incrementAndGet();
+      });
+      log.info("Bootstrapped partitioned RLI cache for partition {} with {} 
owned records from total {} RLI records.",
+          partitionPath, cache.size(), totalCnt.get());
+      return cache;
+    } catch (Exception e) {
+      cache.close();
+      throw new HoodieException("Failed to bootstrap partitioned RLI cache for 
partition " + partitionPath, e);
+    }
+  }
+
+  private BucketCache createBucketCache(String partitionPath) {
+    long inferredCacheSize = inferMemorySizeForCache();
+    cleanIfNecessary(inferredCacheSize, partitionPath);
+    return newBucketCache(createSpillableMap(partitionPath, 
inferredCacheSize), Long.MIN_VALUE);
+  }
+
+  private ExternalSpillableMap<String, Integer> createSpillableMap(String 
partitionPath, long inferredCacheSize) {
+    // preserve some memory for fileId dict, assuming the average file group 
number as Short.MAX_VALUE, then the size will be about 1MB.
+    long maxInMemorySizeInBytes = Math.max(inferredCacheSize - 
BucketCache.FILE_ID_DICT_ENTRY_SIZE * Short.MAX_VALUE, 1);
+    try {
+      return new ExternalSpillableMap<>(
+          maxInMemorySizeInBytes,
+          writeConfig.getSpillableMapBasePath(),
+          new DefaultSizeEstimator<>(),
+          new DefaultSizeEstimator<>(),
+          ExternalSpillableMap.DiskMapType.ROCKS_DB,
+          new DefaultSerializer<>(),
+          
writeConfig.getBoolean(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED),
+          "PartitionedRLICache-" + partitionPath);
+    } catch (IOException e) {
+      throw new HoodieIOException("Failed to create partitioned RLI cache for 
partition " + partitionPath, e);
+    }
   }
 
   private void reloadMetadataTable() {
+    closeMetadataTable();
     this.metadataTable = 
metaClient.getTableFormat().getMetadataFactory().create(
         HoodieFlinkEngineContext.DEFAULT,
         metaClient.getStorage(),
@@ -128,16 +221,179 @@ public class RecordLevelIndexBackend implements 
MinibatchIndexBackend {
         conf.get(FlinkOptions.PATH));
   }
 
+  private void updateEvictableCkp(long checkpointId) {
+    ValidationUtils.checkArgument(checkpointId >= minRetainedCheckpointId,
+        String.format("The minimum retained checkpoint id should be increased, 
previous: %s, received: %s",
+            minRetainedCheckpointId, checkpointId));
+    minRetainedCheckpointId = checkpointId;
+  }
+
+  private long inferMemorySizeForCache() {
+    int concurrentPartitionsNum = 
conf.get(FlinkOptions.INDEX_RLI_CACHE_CONCURRENT_PARTITIONS_NUM);
+    if (partitionBucketCaches.isEmpty()) {
+      return maxCacheSizeInBytes / concurrentPartitionsNum;
+    }
+    long averageMemorySize = getCurrentHeapSize() / 
partitionBucketCaches.size();
+    if (averageMemorySize <= 0) {
+      return maxCacheSizeInBytes / concurrentPartitionsNum;
+    }
+    return averageMemorySize;
+  }
+
+  private long getCurrentHeapSize() {
+    return partitionBucketCaches.values().stream()
+        .map(BucketCache::getHeapSize)
+        .reduce(Long::sum)
+        .orElse(0L);
+  }
+
+  @VisibleForTesting
+  void cleanIfNecessary(long nextCacheSize, String protectedPartitionPath) {
+    while (getCurrentHeapSize() + nextCacheSize > maxCacheSizeInBytes) {
+      boolean cleaned = false;
+      Iterator<Map.Entry<String, BucketCache>> iterator = 
partitionBucketCaches.entrySet().iterator();
+      while (iterator.hasNext()) {
+        Map.Entry<String, BucketCache> entry = iterator.next();
+        if (entry.getKey().equals(protectedPartitionPath)) {
+          continue;
+        }
+        BucketCache cache = entry.getValue();
+        if (cache.lastUpdatedCheckpoint < minRetainedCheckpointId) {
+          cache.close();
+          iterator.remove();
+          cleaned = true;
+          log.info("Evict partitioned RLI cache for partition {}", 
entry.getKey());
+          break;
+        }
+      }
+      if (!cleaned) {
+        // All remaining partition caches are either protected or too recent 
to evict safely.
+        // Returning avoids retrying the same scan without making progress.
+        return;
+      }
+    }
+  }
+
   @Override
   public void close() throws IOException {
-    this.recordIndexCache.close();
-    if (this.metadataTable == null) {
-      return;
+    partitionBucketCaches.values().forEach(BucketCache::close);
+    partitionBucketCaches.clear();
+    closeMetadataTable();
+  }
+
+  private void closeMetadataTable() {
+    if (metadataTable != null) {
+      try {
+        metadataTable.close();
+      } catch (Exception e) {
+        throw new HoodieException("Failed to close metadata table", e);
+      } finally {
+        metadataTable = null;
+      }
     }
-    try {
-      this.metadataTable.close();
-    } catch (Exception e) {
-      throw new HoodieException("Exception happened during close metadata 
table.", e);
+  }
+
+  /**
+   * Creates a bucket cache with an injected spillable map for tests.
+   *
+   * @param recordKeyToFileGroupIdCode encoded record-key to file-group-id 
mapping
+   * @param lastUpdatedCheckpoint checkpoint id of the latest non-bootstrap 
update
+   * @return bucket cache for one data partition
+   */
+  @VisibleForTesting
+  public BucketCache newBucketCache(
+      ExternalSpillableMap<String, Integer> recordKeyToFileGroupIdCode,
+      long lastUpdatedCheckpoint) {
+    return new BucketCache(recordKeyToFileGroupIdCode, lastUpdatedCheckpoint);
+  }
+
+  @FunctionalInterface
+  public interface BootstrapFilter {
+    boolean shouldLoad(String partitionPath, String recordKey, String fileId);
+  }
+
+  /**
+   * Bucket Cache of one data table partition.
+   *
+   * <p>The spillable map stores {@code recordKey -> fileGroupIdCode}. File 
group ids are dictionary
+   * encoded inside the cache so hot partitions do not repeat UUID-style file 
group id strings for
+   * every record key entry.
+   */
+  public class BucketCache implements Closeable {
+    @Getter
+    private final ExternalSpillableMap<String, Integer> 
recordKeyToFileGroupIdCode;
+    // Keep file group ids in a partition-local dictionary so the spillable 
record map stores
+    // compact integer codes instead of repeating long UUID-style file group 
id strings.
+    private final Map<String, Integer> fileGroupIdToDictId;
+    private final List<String> dictIdToFileGroupId;
+    private long lastUpdatedCheckpoint;
+    // File group ids generated by bucket assign are UUID-style 36-character 
strings.
+    // Each dictionary entry keeps one file group id and one 4-byte integer 
code.
+    private static final long FILE_ID_DICT_ENTRY_SIZE = 36L + Integer.BYTES;
+
+    BucketCache(ExternalSpillableMap<String, Integer> 
recordKeyToFileGroupIdCode, long lastUpdatedCheckpoint) {
+      this.recordKeyToFileGroupIdCode = recordKeyToFileGroupIdCode;
+      this.fileGroupIdToDictId = new HashMap<>();
+      this.dictIdToFileGroupId = new ArrayList<>();
+      this.lastUpdatedCheckpoint = lastUpdatedCheckpoint;
+    }
+
+    /**
+     * Returns the decoded file group id for the record key, or {@code null} 
when the key is unknown.
+     */
+    String getFileGroupId(String recordKey) {
+      Integer fileGroupIdCode = this.recordKeyToFileGroupIdCode.get(recordKey);
+      return fileGroupIdCode == null ? null : 
dictIdToFileGroupId.get(fileGroupIdCode);
+    }
+
+    /**
+     * Returns the number of cached record-key mappings in this partition.
+     */
+    int size() {
+      return this.recordKeyToFileGroupIdCode.size();
+    }
+
+    private long getHeapSize() {
+      return this.recordKeyToFileGroupIdCode.getCurrentInMemoryMapSize() + 
getFileGroupIdDictHeapSize();
+    }
+
+    /**
+     * Adds a record key mapping from incoming new records.
+     */
+    void putRecordKey(String recordKey, String fileGroupId) {
+      this.recordKeyToFileGroupIdCode.put(recordKey, 
getOrCreateFileGroupIdCode(fileGroupId));
+      this.lastUpdatedCheckpoint = currentCheckpointId;
+    }
+
+    /**
+     * Bootstrap a record key mapping from the RLI data.
+     */
+    void bootstrapRecordKey(String recordKey, String fileGroupId) {
+      this.recordKeyToFileGroupIdCode.put(recordKey, 
getOrCreateFileGroupIdCode(fileGroupId));
+    }
+
+    private int getOrCreateFileGroupIdCode(String fileGroupId) {
+      // Encode each unique file group id once, and reuse the code for all 
record keys routed to it.
+      Integer existingCode = fileGroupIdToDictId.get(fileGroupId);
+      if (existingCode != null) {
+        return existingCode;
+      }
+
+      int newCode = dictIdToFileGroupId.size();
+      fileGroupIdToDictId.put(fileGroupId, newCode);
+      dictIdToFileGroupId.add(fileGroupId);
+      return newCode;
+    }
+
+    private long getFileGroupIdDictHeapSize() {
+      return dictIdToFileGroupId.size() * FILE_ID_DICT_ENTRY_SIZE;
+    }
+
+    @Override
+    public void close() {
+      this.recordKeyToFileGroupIdCode.close();
+      this.fileGroupIdToDictId.clear();
+      this.dictIdToFileGroupId.clear();
     }
   }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RocksDBIndexBackend.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RocksDBIndexBackend.java
index 86d383a3916c..ea45f7da9bd0 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RocksDBIndexBackend.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RocksDBIndexBackend.java
@@ -32,10 +32,10 @@ import java.io.IOException;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
- * An implementation of {@link IndexBackend} based on RocksDB.
+ * An implementation of {@link GlobalIndexBackend} based on RocksDB.
  */
 @Slf4j
-public class RocksDBIndexBackend implements IndexBackend {
+public class RocksDBIndexBackend implements GlobalIndexBackend {
   private static final String COLUMN_FAMILY = "index_cache";
 
   private final RocksDBDAO rocksDBDAO;
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
index 47d1f8e9e716..44e9dfd335cc 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
@@ -56,11 +56,14 @@ import org.apache.hudi.sink.compact.CompactionPlanEvent;
 import org.apache.hudi.sink.compact.CompactionPlanOperator;
 import org.apache.hudi.sink.partitioner.BucketAssignFunction;
 import org.apache.hudi.sink.partitioner.BucketIndexPartitioner;
+import org.apache.hudi.sink.partitioner.DynamicBucketAssignFunction;
+import org.apache.hudi.sink.partitioner.DynamicBucketAssignOperator;
 import org.apache.hudi.sink.partitioner.MiniBatchBucketAssignOperator;
+import org.apache.hudi.sink.partitioner.MinibatchBucketAssignFunction;
 import org.apache.hudi.sink.partitioner.RecordIndexPartitioner;
+import org.apache.hudi.sink.partitioner.GlobalRecordIndexPartitioner;
 import org.apache.hudi.sink.partitioner.index.IndexRowUtils;
 import org.apache.hudi.sink.partitioner.index.IndexWriteOperator;
-import org.apache.hudi.sink.partitioner.MinibatchBucketAssignFunction;
 import org.apache.hudi.sink.transform.RowDataToHoodieFunctions;
 import org.apache.hudi.table.format.FilePathUtils;
 
@@ -126,6 +129,10 @@ public class Pipelines {
     final int PARALLELISM_VALUE = conf.get(FlinkOptions.WRITE_TASKS);
     final boolean isBucketIndexType = OptionsResolver.isBucketIndexType(conf);
 
+    if (OptionsResolver.isRecordLevelIndex(conf)) {
+      throw new HoodieException(
+          "Record level index does not work with bulk insert using FLINK 
engine.");
+    }
     if (isBucketIndexType) {
       // TODO support bulk insert for consistent bucket index
       if (OptionsResolver.isConsistentHashingBucketIndexType(conf)) {
@@ -285,7 +292,7 @@ public class Pipelines {
     DataStream<HoodieFlinkInternalRow> dataStream1 = 
rowDataToHoodieRecord(conf, rowType, dataStream);
 
     if (conf.get(FlinkOptions.INDEX_BOOTSTRAP_ENABLED) || bounded) {
-      boolean isRliBootstrap = OptionsResolver.isRecordLevelIndex(conf);
+      boolean isRliBootstrap = OptionsResolver.isGlobalRecordLevelIndex(conf);
       dataStream1 = dataStream1
           .transform(
               "index_bootstrap",
@@ -424,7 +431,11 @@ public class Pipelines {
       if (isStreamingIndexWriteEnabled) {
         // index writing pipeline
         SingleOutputStreamOperator<RowData> indexWriteDatastream = 
writeDatastream
-            .partitionCustom(new RecordIndexPartitioner(conf), 
IndexRowUtils::getHoodieKey)
+            .partitionCustom(
+                OptionsResolver.isRecordLevelIndex(conf)
+                    ? new RecordIndexPartitioner(conf)
+                    : new GlobalRecordIndexPartitioner(conf),
+                IndexRowUtils::getHoodieKey)
             .transform(
                 opName("index_write", conf),
                 TypeInformation.of(RowData.class),
@@ -450,15 +461,23 @@ public class Pipelines {
   private static DataStream<HoodieFlinkInternalRow> createBucketAssignStream(
       DataStream<HoodieFlinkInternalRow> inputStream, Configuration conf, 
RowType rowType, String writeOperatorUid) {
     String assignerOperatorName = "bucket_assigner";
-    if (OptionsResolver.isRecordLevelIndex(conf) && 
!conf.get(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
+    if (OptionsResolver.isGlobalRecordLevelIndex(conf) && 
!conf.get(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
       return inputStream
-          .partitionCustom(new RecordIndexPartitioner(conf), row -> new 
HoodieKey(row.getRecordKey(), row.getPartitionPath()))
           .transform(
               assignerOperatorName,
               new HoodieFlinkInternalRowTypeInfo(rowType),
               new MiniBatchBucketAssignOperator(new 
MinibatchBucketAssignFunction(conf), 
OperatorIDGenerator.fromUid(writeOperatorUid)))
           .uid(opUID(assignerOperatorName, conf))
           .setParallelism(conf.get(FlinkOptions.BUCKET_ASSIGN_TASKS));
+    } else if (OptionsResolver.isRecordLevelIndex(conf)) {
+      return inputStream
+          .keyBy(HoodieFlinkInternalRow::getRecordKey)
+          .transform(
+              assignerOperatorName,
+              new HoodieFlinkInternalRowTypeInfo(rowType),
+              new DynamicBucketAssignOperator(new 
DynamicBucketAssignFunction(conf), 
OperatorIDGenerator.fromUid(writeOperatorUid)))
+          .uid(opUID(assignerOperatorName, conf))
+          .setParallelism(conf.get(FlinkOptions.BUCKET_ASSIGN_TASKS));
     } else {
       return inputStream
           // Key-by record key, to avoid multiple subtasks write to a bucket 
at the same time
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/MinibatchIndexBackend.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/SamplingActionExecutor.java
similarity index 56%
copy from 
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/MinibatchIndexBackend.java
copy to 
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/SamplingActionExecutor.java
index 0b79ee10bcdf..31addf73cd87 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/MinibatchIndexBackend.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/SamplingActionExecutor.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,21 +16,29 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.sink.partitioner.index;
-
-import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
-import org.apache.hudi.common.util.collection.Pair;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
+package org.apache.hudi.sink.utils;
 
 /**
- * Index delegator which supports mini-batch index operations.
+ * Executes an action after a fixed number of record updates.
  */
-public interface MinibatchIndexBackend extends IndexBackend {
+public class SamplingActionExecutor {
+  static final int DEFAULT_RECORD_INTERVAL = 10000;
+
+  private final int stepSize;
+  private long recordCnt;
+
+  public SamplingActionExecutor() {
+    this(DEFAULT_RECORD_INTERVAL);
+  }
 
-  Map<String, HoodieRecordGlobalLocation> get(List<String> recordKey) throws 
IOException;
+  public SamplingActionExecutor(int stepSize) {
+    this.stepSize = stepSize;
+  }
 
-  void update(List<Pair<String, HoodieRecordGlobalLocation>> 
recordKeysAndLocations) throws IOException;
-}
+  public void runIfNecessary(Runnable action) {
+    if ((++recordCnt) % stepSize == 0) {
+      action.run();
+      recordCnt = 0L;
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/RecordLevelIndex.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/RecordLevelIndex.java
index 2b0d21baf047..67e20e9e2282 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/RecordLevelIndex.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/RecordLevelIndex.java
@@ -39,6 +39,7 @@ import org.apache.hudi.sink.bulk.RowDataKeyGen;
 import org.apache.hudi.source.ExpressionEvaluators;
 import org.apache.hudi.util.StreamerUtil;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.types.logical.DecimalType;
@@ -59,6 +60,7 @@ import java.util.stream.Collectors;
 /**
  * An index support implementation that leverages Record Level Index to prune 
file slices.
  */
+@Slf4j
 public class RecordLevelIndex implements FlinkMetadataIndex {
   private static final long serialVersionUID = 1L;
   private static final Logger LOG = 
LoggerFactory.getLogger(RecordLevelIndex.class);
@@ -107,16 +109,20 @@ public class RecordLevelIndex implements 
FlinkMetadataIndex {
     if (!isIndexAvailable()) {
       return fileSlices;
     }
-    HoodiePairData<String, HoodieRecordGlobalLocation> recordIndexData =
-        
getMetadataTable().readRecordIndexLocationsWithKeys(HoodieListData.eager(hoodieKeysFromFilter));
+    HoodiePairData<String, HoodieRecordGlobalLocation> recordIndexData = null;
     try {
+      recordIndexData =
+          
getMetadataTable().readRecordIndexLocationsWithKeys(HoodieListData.eager(hoodieKeysFromFilter));
       List<Pair<String, HoodieRecordGlobalLocation>> recordIndexLocations = 
HoodieDataUtils.dedupeAndCollectAsList(recordIndexData);
       Set<String> fileIds = recordIndexLocations.stream()
           .map(pair -> 
pair.getValue().getFileId()).collect(Collectors.toSet());
       return fileSlices.stream().filter(fileSlice -> 
fileIds.contains(fileSlice.getFileId())).collect(Collectors.toList());
+    } catch (Throwable e) {
+      log.error("Failed to read metadata index: {} for data skipping", 
getIndexPartitionName(), e);
+      return fileSlices;
     } finally {
       // Clean up the RDD to avoid memory leaks
-      recordIndexData.unpersistWithDependencies();
+      
Option.ofNullable(recordIndexData).ifPresent(HoodiePairData::unpersistWithDependencies);
     }
   }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
index a08f3f56aacb..e87a2680f185 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
@@ -196,16 +196,26 @@ public class HoodieTableFactory implements 
DynamicTableSourceFactory, DynamicTab
       HoodieIndexConfig.INDEX_TYPE.checkValues(indexTypeStr);
     }
     HoodieIndex.IndexType indexType = OptionsResolver.getIndexType(conf);
-    if (indexType == HoodieIndex.IndexType.GLOBAL_RECORD_LEVEL_INDEX) {
-      ValidationUtils.checkArgument(conf.get(FlinkOptions.METADATA_ENABLED),
-          String.format("Metadata table should be enabled when %s is %s.", 
FlinkOptions.INDEX_TYPE.key(), 
HoodieIndex.IndexType.GLOBAL_RECORD_LEVEL_INDEX));
-      
ValidationUtils.checkArgument(conf.get(FlinkOptions.INDEX_GLOBAL_ENABLED),
-          String.format("Partition level index updating is not supported for 
GLOBAL_RECORD_LEVEL_INDEX, please set '%s' = 'true'.", 
FlinkOptions.INDEX_GLOBAL_ENABLED.key()));
-
-      boolean deferredRLI = Boolean.parseBoolean(conf.getString(
-          HoodieMetadataConfig.DEFER_RLI_INIT_FOR_FRESH_TABLE.key(), 
HoodieMetadataConfig.DEFER_RLI_INIT_FOR_FRESH_TABLE.defaultValue().toString()));
-      ValidationUtils.checkArgument(!deferredRLI,
-          String.format("Deferred RLI initialization is not supported for 
flink ingestion, please set '%s' = 'false'.", 
HoodieMetadataConfig.DEFER_RLI_INIT_FOR_FRESH_TABLE.key()));
+    switch (indexType) {
+      case GLOBAL_RECORD_LEVEL_INDEX:
+        ValidationUtils.checkArgument(conf.get(FlinkOptions.METADATA_ENABLED),
+            String.format("Metadata table should be enabled when %s is %s.", 
FlinkOptions.INDEX_TYPE.key(), 
HoodieIndex.IndexType.GLOBAL_RECORD_LEVEL_INDEX));
+        
ValidationUtils.checkArgument(conf.get(FlinkOptions.INDEX_GLOBAL_ENABLED),
+            String.format("Partition level index updating is not supported for 
GLOBAL_RECORD_LEVEL_INDEX, please set '%s' = 'true'.", 
FlinkOptions.INDEX_GLOBAL_ENABLED.key()));
+
+        boolean deferredRLI = Boolean.parseBoolean(conf.getString(
+            HoodieMetadataConfig.DEFER_RLI_INIT_FOR_FRESH_TABLE.key(), 
HoodieMetadataConfig.DEFER_RLI_INIT_FOR_FRESH_TABLE.defaultValue().toString()));
+        ValidationUtils.checkArgument(!deferredRLI,
+            String.format("Deferred RLI initialization is not supported for 
flink ingestion, please set '%s' = 'false'.", 
HoodieMetadataConfig.DEFER_RLI_INIT_FOR_FRESH_TABLE.key()));
+        break;
+      case RECORD_LEVEL_INDEX:
+        ValidationUtils.checkArgument(OptionsResolver.isUpsertOperation(conf) 
|| OptionsResolver.isInsertOverwrite(conf),
+            "Partitioned record level index supports only Flink streaming 
upsert and insert overwrite.");
+        
ValidationUtils.checkArgument(!OptionsResolver.isNonBlockingConcurrencyControl(conf),
+            "Partitioned record level index does not support non-blocking 
concurrency control.");
+        break;
+      default:
+        break;
     }
   }
 
@@ -445,6 +455,14 @@ public class HoodieTableFactory implements 
DynamicTableSourceFactory, DynamicTab
       if (!conf.contains(FlinkOptions.INDEX_RLI_WRITE_BUFFER_SIZE)) {
         conf.set(FlinkOptions.INDEX_RLI_WRITE_BUFFER_SIZE, 
OptionsResolver.getWriteBufferSizeInBytes(conf) / 1024 / 1024 / 4);
       }
+    } else if (indexType == HoodieIndex.IndexType.RECORD_LEVEL_INDEX) {
+      
conf.setString(HoodieMetadataConfig.RECORD_LEVEL_INDEX_ENABLE_PROP.key(), 
"true");
+      conf.set(FlinkOptions.INDEX_GLOBAL_ENABLED, false);
+      conf.setString(HoodieMetadataConfig.STREAMING_WRITE_ENABLED.key(), 
"true");
+      conf.set(FlinkOptions.INDEX_BOOTSTRAP_ENABLED, false);
+      if (!conf.contains(FlinkOptions.INDEX_RLI_WRITE_BUFFER_SIZE)) {
+        conf.set(FlinkOptions.INDEX_RLI_WRITE_BUFFER_SIZE, 
OptionsResolver.getWriteBufferSizeInBytes(conf) / 1024 / 1024 / 4);
+      }
     } else {
       conf.setString(HoodieMetadataConfig.STREAMING_WRITE_ENABLED.key(), 
"false");
     }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/configuration/TestOptionsResolver.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/configuration/TestOptionsResolver.java
index 39a4ee93d7ec..1383722eb0a8 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/configuration/TestOptionsResolver.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/configuration/TestOptionsResolver.java
@@ -20,6 +20,7 @@ package org.apache.hudi.configuration;
 
 import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
 import org.apache.hudi.common.model.WriteConcurrencyMode;
+import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.config.HoodieCleanConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.index.HoodieIndex;
@@ -52,6 +53,24 @@ public class TestOptionsResolver {
     assertEquals(HoodieIndex.IndexType.BLOOM, 
OptionsResolver.getIndexType(conf));
   }
 
+  @Test
+  void testRecordLevelIndexStreamingWrite() {
+    Configuration conf = getConf();
+    conf.set(FlinkOptions.METADATA_ENABLED, true);
+    conf.set(FlinkOptions.INDEX_TYPE, 
HoodieIndex.IndexType.RECORD_LEVEL_INDEX.name());
+
+    assertTrue(OptionsResolver.isRecordLevelIndex(conf));
+    assertTrue(OptionsResolver.isStreamingIndexWriteEnabled(conf));
+
+    conf.set(FlinkOptions.OPERATION, 
WriteOperationType.INSERT_OVERWRITE.value());
+    assertFalse(OptionsResolver.isStreamingIndexWriteEnabled(conf));
+
+    conf.set(FlinkOptions.OPERATION, WriteOperationType.UPSERT.value());
+    conf.set(FlinkOptions.INDEX_TYPE, HoodieIndex.IndexType.BUCKET.name());
+    assertFalse(OptionsResolver.isRecordLevelIndex(conf));
+    assertFalse(OptionsResolver.isStreamingIndexWriteEnabled(conf));
+  }
+
   @Test
   void testIsLazyFailedWritesCleanPolicy() {
     Configuration conf = new Configuration();
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestRecordIndexPartitioner.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestGlobalRecordIndexPartitioner.java
similarity index 94%
copy from 
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestRecordIndexPartitioner.java
copy to 
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestGlobalRecordIndexPartitioner.java
index 54bc3a4e8ccb..32163baf38d2 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestRecordIndexPartitioner.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestGlobalRecordIndexPartitioner.java
@@ -34,11 +34,11 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
- * Test cases for {@link RecordIndexPartitioner}.
+ * Test cases for {@link GlobalRecordIndexPartitioner}.
  */
-public class TestRecordIndexPartitioner {
+public class TestGlobalRecordIndexPartitioner {
 
-  private static RecordIndexPartitioner partitioner;
+  private static GlobalRecordIndexPartitioner partitioner;
 
   @TempDir
   static File tempFile;
@@ -49,7 +49,7 @@ public class TestRecordIndexPartitioner {
     Configuration conf = TestConfigurations.getDefaultConf(basePath);
     
conf.setString(HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_ENABLE_PROP.key(),
 "true");
     TestData.writeData(TestData.DATA_SET_INSERT, conf);
-    partitioner = new RecordIndexPartitioner(conf);
+    partitioner = new GlobalRecordIndexPartitioner(conf);
   }
 
   @Test
@@ -104,4 +104,4 @@ public class TestRecordIndexPartitioner {
     int partition = partitioner.partition(key, numPartitions);
     assertTrue(partition >= 0 && partition < numPartitions);
   }
-}
\ No newline at end of file
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestRecordIndexPartitioner.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestRecordIndexPartitioner.java
index 54bc3a4e8ccb..59cd3d93fb0b 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestRecordIndexPartitioner.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestRecordIndexPartitioner.java
@@ -20,88 +20,90 @@ package org.apache.hudi.sink.partitioner;
 
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.util.hash.BucketIndexUtil;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.util.StreamerUtil;
 import org.apache.hudi.utils.TestConfigurations;
-import org.apache.hudi.utils.TestData;
 
 import org.apache.flink.configuration.Configuration;
-import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
 import java.io.File;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
- * Test cases for {@link RecordIndexPartitioner}.
+ * Test cases for partitioned record level index routing in {@link 
RecordIndexPartitioner}.
  */
 public class TestRecordIndexPartitioner {
-
-  private static RecordIndexPartitioner partitioner;
+  private static final int FILE_GROUP_COUNT = 3;
+  private static final int NUM_PARTITIONS = 4;
 
   @TempDir
-  static File tempFile;
-
-  @BeforeAll
-  public static void beforeAll() throws Exception {
-    final String basePath = tempFile.getAbsolutePath();
-    Configuration conf = TestConfigurations.getDefaultConf(basePath);
-    
conf.setString(HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_ENABLE_PROP.key(),
 "true");
-    TestData.writeData(TestData.DATA_SET_INSERT, conf);
-    partitioner = new RecordIndexPartitioner(conf);
-  }
+  File tempFile;
 
   @Test
-  void testPartitionMethod() {
-    // Test partitioning with different record keys
-    HoodieKey key1 = new HoodieKey("record_key_1", "partition_path");
-    HoodieKey key2 = new HoodieKey("record_key_2", "partition_path");
-    HoodieKey key3 = new HoodieKey("another_record_key", "partition_path");
-    
-    int numPartitions = 10;
-    
-    // Test that partitioning works consistently
-    int partition1 = partitioner.partition(key1, numPartitions);
-    int partition2 = partitioner.partition(key2, numPartitions);
-    int partition3 = partitioner.partition(key3, numPartitions);
-    
-    // Each partition should be within the range [0, numPartitions)
-    assertTrue(partition1 >= 0 && partition1 < numPartitions);
-    assertTrue(partition2 >= 0 && partition2 < numPartitions);
-    assertTrue(partition3 >= 0 && partition3 < numPartitions);
-    
-    // Same key should always map to the same partition
-    assertEquals(partitioner.partition(key1, numPartitions), 
partitioner.partition(key1, numPartitions));
+  void testNewPartitionUsesMinFileGroupCount() throws Exception {
+    RecordIndexPartitioner partitioner = newPartitioner();
+    String recordKey = "record_key";
+
+    assertEquals(
+        expectedPartition(recordKey, "par1"),
+        partitioner.partition(new HoodieKey(recordKey, "par1"), 
NUM_PARTITIONS));
+    assertEquals(
+        expectedPartition(recordKey, "par2"),
+        partitioner.partition(new HoodieKey(recordKey, "par2"), 
NUM_PARTITIONS));
   }
 
   @Test
-  void testPartitionConsistency() {
+  void testPartitionConsistency() throws Exception {
+    RecordIndexPartitioner partitioner = newPartitioner();
     HoodieKey key = new HoodieKey("consistent_test_key", "partition_path");
-    int numPartitions = 5;
-    
-    // Test that the same key always maps to the same partition
-    int expectedPartition = partitioner.partition(key, numPartitions);
+
+    int expectedPartition = partitioner.partition(key, NUM_PARTITIONS);
     for (int i = 0; i < 10; i++) {
-      assertEquals(expectedPartition, partitioner.partition(key, 
numPartitions));
+      assertEquals(expectedPartition, partitioner.partition(key, 
NUM_PARTITIONS));
     }
   }
 
   @Test
-  void testEdgeCaseSinglePartition() {
-    HoodieKey key = new HoodieKey("any_key", "partition_path");
-    int numPartitions = 1; // Single partition
-    
-    // With single partition, everything should go to partition 0
-    assertEquals(0, partitioner.partition(key, numPartitions));
+  void testPartitionWithinRange() throws Exception {
+    RecordIndexPartitioner partitioner = newPartitioner();
+
+    int partition1 = partitioner.partition(new HoodieKey("record_key", 
"par1"), NUM_PARTITIONS);
+    int partition2 = partitioner.partition(new HoodieKey("record_key", 
"par2"), NUM_PARTITIONS);
+    int partition3 = partitioner.partition(new HoodieKey("record_key", 
"par3"), NUM_PARTITIONS);
+
+    assertTrue(partition1 >= 0 && partition1 < NUM_PARTITIONS);
+    assertTrue(partition2 >= 0 && partition2 < NUM_PARTITIONS);
+    assertTrue(partition3 >= 0 && partition3 < NUM_PARTITIONS);
+
+    assertNotEquals(partition1, partition2);
+    assertNotEquals(partition1, partition3);
   }
 
   @Test
-  void testLargeNumberOfPartitions() {
-    HoodieKey key = new HoodieKey("test_key_for_large_partition", 
"partition_path");
-    int numPartitions = 100; // Large number of partitions
-    
-    int partition = partitioner.partition(key, numPartitions);
-    assertTrue(partition >= 0 && partition < numPartitions);
+  void testSinglePartition() throws Exception {
+    RecordIndexPartitioner partitioner = newPartitioner();
+
+    assertEquals(0, partitioner.partition(new HoodieKey("any_key", "par1"), 
1));
+  }
+
+  private RecordIndexPartitioner newPartitioner() throws Exception {
+    Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+    conf.set(FlinkOptions.INDEX_TYPE, 
HoodieIndex.IndexType.RECORD_LEVEL_INDEX.name());
+    
conf.setString(HoodieMetadataConfig.RECORD_LEVEL_INDEX_MIN_FILE_GROUP_COUNT_PROP.key(),
 String.valueOf(FILE_GROUP_COUNT));
+    StreamerUtil.initTableIfNotExists(conf);
+    return new RecordIndexPartitioner(conf);
+  }
+
+  private int expectedPartition(String recordKey, String partitionPath) {
+    int recordIndexFileGroup = 
HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(recordKey, 
FILE_GROUP_COUNT);
+    return 
BucketIndexUtil.getPartitionIndexFunc(NUM_PARTITIONS).apply(FILE_GROUP_COUNT, 
partitionPath, recordIndexFileGroup);
   }
-}
\ No newline at end of file
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestRecordLevelIndexBackend.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestGlobalRecordLevelIndexBackend.java
similarity index 69%
copy from 
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestRecordLevelIndexBackend.java
copy to 
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestGlobalRecordLevelIndexBackend.java
index 70011b514ee7..396a57185cec 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestRecordLevelIndexBackend.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestGlobalRecordLevelIndexBackend.java
@@ -49,9 +49,9 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 /**
- * Test cases for {@link RecordLevelIndexBackend}.
+ * Test cases for {@link GlobalRecordLevelIndexBackend}.
  */
-public class TestRecordLevelIndexBackend {
+public class TestGlobalRecordLevelIndexBackend {
 
   private Configuration conf;
 
@@ -72,34 +72,34 @@ public class TestRecordLevelIndexBackend {
 
     String firstCommitTime = 
TestUtils.getLastCompleteInstant(tempFile.toURI().toString());
 
-    try (RecordLevelIndexBackend recordLevelIndexBackend = new 
RecordLevelIndexBackend(conf, -1)) {
+    try (GlobalRecordLevelIndexBackend globalRecordLevelIndexBackend = new 
GlobalRecordLevelIndexBackend(conf, -1)) {
       // get record location
-      HoodieRecordGlobalLocation location = recordLevelIndexBackend.get("id1");
+      HoodieRecordGlobalLocation location = 
globalRecordLevelIndexBackend.get("id1");
       assertNotNull(location);
       assertEquals("par1", location.getPartitionPath());
       assertEquals(firstCommitTime, location.getInstantTime());
 
       // get record location with non existed key
-      location = recordLevelIndexBackend.get("new_key");
+      location = globalRecordLevelIndexBackend.get("new_key");
       assertNull(location);
 
       // get records locations for multiple record keys
-      Map<String, HoodieRecordGlobalLocation> locations = 
recordLevelIndexBackend.get(Arrays.asList("id1", "id2", "id3"));
+      Map<String, HoodieRecordGlobalLocation> locations = 
globalRecordLevelIndexBackend.get(Arrays.asList("id1", "id2", "id3"));
       assertEquals(3, locations.size());
       locations.values().forEach(Assertions::assertNotNull);
 
       // get records locations for multiple record keys with unexisted key
-      locations = recordLevelIndexBackend.get(Arrays.asList("id1", "id2", 
"new_key"));
+      locations = globalRecordLevelIndexBackend.get(Arrays.asList("id1", 
"id2", "new_key"));
       assertEquals(3, locations.size());
       assertNull(locations.get("new_key"));
 
       // new checkpoint
-      recordLevelIndexBackend.onCheckpoint(1);
+      globalRecordLevelIndexBackend.onCheckpoint(1);
 
       // update record location
       HoodieRecordGlobalLocation newLocation = new 
HoodieRecordGlobalLocation("par5", "1003", "file_id_4");
-      recordLevelIndexBackend.update("new_key", newLocation);
-      location = recordLevelIndexBackend.get("new_key");
+      globalRecordLevelIndexBackend.update("new_key", newLocation);
+      location = globalRecordLevelIndexBackend.get("new_key");
       assertEquals(newLocation, location);
 
       // previous instant commit success, clean
@@ -107,12 +107,12 @@ public class TestRecordLevelIndexBackend {
       Map<Long, String> inflightInstants = new HashMap<>();
       inflightInstants.put(1L, "0001");
       
when(correspondent.requestInflightInstants()).thenReturn(inflightInstants);
-      recordLevelIndexBackend.onCheckpointComplete(correspondent, 1);
-      assertEquals(2, 
recordLevelIndexBackend.getRecordIndexCache().getCaches().size());
+      globalRecordLevelIndexBackend.onCheckpointComplete(correspondent, 1);
+      assertEquals(2, 
globalRecordLevelIndexBackend.getRecordIndexCache().getCaches().size());
       // the cache contains 'new_key', and other old locations
-      location = recordLevelIndexBackend.getRecordIndexCache().get("new_key");
+      location = 
globalRecordLevelIndexBackend.getRecordIndexCache().get("new_key");
       assertEquals(newLocation, location);
-      location = recordLevelIndexBackend.getRecordIndexCache().get("id1");
+      location = 
globalRecordLevelIndexBackend.getRecordIndexCache().get("id1");
       assertNotNull(location);
     }
   }
@@ -122,69 +122,69 @@ public class TestRecordLevelIndexBackend {
     // set a small value for RLI cache
     conf.set(FlinkOptions.INDEX_RLI_CACHE_SIZE, 1L);
 
-    try (RecordLevelIndexBackend recordLevelIndexBackend = new 
RecordLevelIndexBackend(conf, -1)) {
+    try (GlobalRecordLevelIndexBackend globalRecordLevelIndexBackend = new 
GlobalRecordLevelIndexBackend(conf, -1)) {
 
       for (int i = 0; i < 1500; i++) {
-        recordLevelIndexBackend.update("id1_" + i,
+        globalRecordLevelIndexBackend.update("id1_" + i,
             new HoodieRecordGlobalLocation("par1", "000000001", 
UUID.randomUUID().toString(), -1));
       }
       // new checkpoint
-      recordLevelIndexBackend.onCheckpoint(1);
+      globalRecordLevelIndexBackend.onCheckpoint(1);
       Correspondent correspondent = mock(Correspondent.class);
       Map<Long, String> inflightInstants = new HashMap<>();
       inflightInstants.put(1L, "0001");
       
when(correspondent.requestInflightInstants()).thenReturn(inflightInstants);
-      recordLevelIndexBackend.onCheckpointComplete(correspondent, 1);
+      globalRecordLevelIndexBackend.onCheckpointComplete(correspondent, 1);
 
       for (int i = 0; i < 2000; i++) {
-        recordLevelIndexBackend.update("id2_" + i,
+        globalRecordLevelIndexBackend.update("id2_" + i,
             new HoodieRecordGlobalLocation("par1", "000000001", 
UUID.randomUUID().toString(), -1));
       }
 
       // new checkpoint
-      recordLevelIndexBackend.onCheckpoint(2);
+      globalRecordLevelIndexBackend.onCheckpoint(2);
       correspondent = mock(Correspondent.class);
       inflightInstants = new HashMap<>();
       inflightInstants.put(2L, "0002");
       
when(correspondent.requestInflightInstants()).thenReturn(inflightInstants);
-      recordLevelIndexBackend.onCheckpointComplete(correspondent, 2);
+      globalRecordLevelIndexBackend.onCheckpointComplete(correspondent, 2);
 
       for (int i = 0; i < 2000; i++) {
-        recordLevelIndexBackend.update("id3_" + i,
+        globalRecordLevelIndexBackend.update("id3_" + i,
             new HoodieRecordGlobalLocation("par1", "000000001", 
UUID.randomUUID().toString(), -1));
       }
 
       // the cache for the first instant is evicted
-      assertEquals(2, 
recordLevelIndexBackend.getRecordIndexCache().getCaches().size());
+      assertEquals(2, 
globalRecordLevelIndexBackend.getRecordIndexCache().getCaches().size());
 
       // new checkpoint
-      recordLevelIndexBackend.onCheckpoint(3);
+      globalRecordLevelIndexBackend.onCheckpoint(3);
       correspondent = mock(Correspondent.class);
       inflightInstants = new HashMap<>();
       inflightInstants.put(3L, "0003");
       
when(correspondent.requestInflightInstants()).thenReturn(inflightInstants);
-      recordLevelIndexBackend.onCheckpointComplete(correspondent, 3);
+      globalRecordLevelIndexBackend.onCheckpointComplete(correspondent, 3);
 
       for (int i = 0; i < 500; i++) {
-        recordLevelIndexBackend.update("id4_" + i,
+        globalRecordLevelIndexBackend.update("id4_" + i,
             new HoodieRecordGlobalLocation("par1", "000000001", 
UUID.randomUUID().toString(), -1));
       }
 
-      assertEquals(3, 
recordLevelIndexBackend.getRecordIndexCache().getCaches().size());
+      assertEquals(3, 
globalRecordLevelIndexBackend.getRecordIndexCache().getCaches().size());
 
       // insert another batch of records, which will trigger the cleaning of 
the cache.
       // another cache clean is triggered
       for (int i = 500; i < 1500; i++) {
-        recordLevelIndexBackend.update("id4_" + i,
+        globalRecordLevelIndexBackend.update("id4_" + i,
             new HoodieRecordGlobalLocation("par1", "000000001", 
UUID.randomUUID().toString(), -1));
       }
-      assertEquals(3, 
recordLevelIndexBackend.getRecordIndexCache().getCaches().size());
+      assertEquals(3, 
globalRecordLevelIndexBackend.getRecordIndexCache().getCaches().size());
       // cache for the oldest ckp id will be cleaned
-      
assertNull(recordLevelIndexBackend.getRecordIndexCache().getCaches().get(-1L));
+      
assertNull(globalRecordLevelIndexBackend.getRecordIndexCache().getCaches().get(-1L));
       // caches for the latest 3 ckp id still in the cache
-      assertEquals("par1", 
recordLevelIndexBackend.get("id2_0").getPartitionPath());
-      assertEquals("par1", 
recordLevelIndexBackend.get("id3_0").getPartitionPath());
-      assertEquals("par1", 
recordLevelIndexBackend.get("id4_0").getPartitionPath());
+      assertEquals("par1", 
globalRecordLevelIndexBackend.get("id2_0").getPartitionPath());
+      assertEquals("par1", 
globalRecordLevelIndexBackend.get("id3_0").getPartitionPath());
+      assertEquals("par1", 
globalRecordLevelIndexBackend.get("id4_0").getPartitionPath());
     }
   }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestRecordLevelIndexBackend.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestRecordLevelIndexBackend.java
index 70011b514ee7..c29c256bbaa9 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestRecordLevelIndexBackend.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestRecordLevelIndexBackend.java
@@ -18,34 +18,27 @@
 
 package org.apache.hudi.sink.partitioner.index;
 
-import org.apache.hudi.common.config.HoodieMetadataConfig;
-import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.sink.event.Correspondent;
 import org.apache.hudi.util.StreamerUtil;
 import org.apache.hudi.utils.TestConfigurations;
-import org.apache.hudi.utils.TestData;
-import org.apache.hudi.utils.TestUtils;
 
 import org.apache.flink.configuration.Configuration;
-import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
+import org.mockito.Mockito;
 
 import java.io.File;
-import java.io.IOException;
-import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.UUID;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNull;
-
-import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE;
-import static org.mockito.Mockito.mock;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.when;
 
 /**
@@ -53,138 +46,145 @@ import static org.mockito.Mockito.when;
  */
 public class TestRecordLevelIndexBackend {
 
+  private static final long ONE_MB = 1024 * 1024;
+
   private Configuration conf;
 
   @TempDir
   File tempFile;
 
   @BeforeEach
-  void beforeEach() throws IOException {
+  public void before() throws Exception {
     conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
-    conf.set(FlinkOptions.TABLE_TYPE, COPY_ON_WRITE.name());
-    
conf.setString(HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_ENABLE_PROP.key(),
 "true");
+    conf.setString("hadoop.fs.defaultFS", "file:///");
+    conf.set(FlinkOptions.INDEX_RLI_CACHE_SIZE, 1L);
     StreamerUtil.initTableIfNotExists(conf);
   }
 
   @Test
-  void testRecordLevelIndexBackend() throws Exception {
-    TestData.writeData(TestData.DATA_SET_INSERT, conf);
-
-    String firstCommitTime = 
TestUtils.getLastCompleteInstant(tempFile.toURI().toString());
-
-    try (RecordLevelIndexBackend recordLevelIndexBackend = new 
RecordLevelIndexBackend(conf, -1)) {
-      // get record location
-      HoodieRecordGlobalLocation location = recordLevelIndexBackend.get("id1");
-      assertNotNull(location);
-      assertEquals("par1", location.getPartitionPath());
-      assertEquals(firstCommitTime, location.getInstantTime());
-
-      // get record location with non existed key
-      location = recordLevelIndexBackend.get("new_key");
-      assertNull(location);
-
-      // get records locations for multiple record keys
-      Map<String, HoodieRecordGlobalLocation> locations = 
recordLevelIndexBackend.get(Arrays.asList("id1", "id2", "id3"));
-      assertEquals(3, locations.size());
-      locations.values().forEach(Assertions::assertNotNull);
-
-      // get records locations for multiple record keys with unexisted key
-      locations = recordLevelIndexBackend.get(Arrays.asList("id1", "id2", 
"new_key"));
-      assertEquals(3, locations.size());
-      assertNull(locations.get("new_key"));
-
-      // new checkpoint
-      recordLevelIndexBackend.onCheckpoint(1);
-
-      // update record location
-      HoodieRecordGlobalLocation newLocation = new 
HoodieRecordGlobalLocation("par5", "1003", "file_id_4");
-      recordLevelIndexBackend.update("new_key", newLocation);
-      location = recordLevelIndexBackend.get("new_key");
-      assertEquals(newLocation, location);
-
-      // previous instant commit success, clean
-      Correspondent correspondent = mock(Correspondent.class);
-      Map<Long, String> inflightInstants = new HashMap<>();
-      inflightInstants.put(1L, "0001");
-      
when(correspondent.requestInflightInstants()).thenReturn(inflightInstants);
-      recordLevelIndexBackend.onCheckpointComplete(correspondent, 1);
-      assertEquals(2, 
recordLevelIndexBackend.getRecordIndexCache().getCaches().size());
-      // the cache contains 'new_key', and other old locations
-      location = recordLevelIndexBackend.getRecordIndexCache().get("new_key");
-      assertEquals(newLocation, location);
-      location = recordLevelIndexBackend.getRecordIndexCache().get("id1");
-      assertNotNull(location);
+  public void testCheckpointCompleteDoesNotEagerEvict() throws Exception {
+    try (RecordLevelIndexBackend backend = createBackend()) {
+      backend.getPartitionBucketCaches().put("par1", 
cacheWithHeapSize(backend, 2 * ONE_MB, 1L));
+
+      backend.onCheckpointComplete(new 
TestCorrespondent(Collections.emptyMap()), 2L);
+
+      assertTrue(backend.getPartitionBucketCaches().containsKey("par1"));
+    }
+  }
+
+  @Test
+  public void testLazyEvictOldPartitionsWhenHeapExceedsLimit() throws 
Exception {
+    try (RecordLevelIndexBackend backend = createBackend()) {
+      backend.getPartitionBucketCaches().put("par1", 
cacheWithHeapSize(backend, 800 * 1024, 1L));
+      backend.getPartitionBucketCaches().put("par2", 
cacheWithHeapSize(backend, 800 * 1024, 1L));
+      backend.onCheckpointComplete(new 
TestCorrespondent(Collections.emptyMap()), 2L);
+      backend.getPartitionBucketCaches().put("par3", 
cacheWithHeapSize(backend, 1L, 2L));
+
+      backend.cleanIfNecessary(0L, "par3");
+
+      assertFalse(backend.getPartitionBucketCaches().containsKey("par1"));
+      assertTrue(backend.getPartitionBucketCaches().containsKey("par2"));
+      assertTrue(backend.getPartitionBucketCaches().containsKey("par3"));
     }
   }
 
   @Test
-  void testRecordLevelIndexCacheClean() throws Exception {
-    // set a small value for RLI cache
-    conf.set(FlinkOptions.INDEX_RLI_CACHE_SIZE, 1L);
+  public void testLazyEvictKeepsRecentPartition() throws Exception {
+    try (RecordLevelIndexBackend backend = createBackend()) {
+      backend.getPartitionBucketCaches().put("old", cacheWithHeapSize(backend, 
800 * 1024, 1L));
+      backend.getPartitionBucketCaches().put("recent", 
cacheWithHeapSize(backend, 800 * 1024, 2L));
+      backend.onCheckpointComplete(new 
TestCorrespondent(Collections.emptyMap()), 2L);
+
+      backend.cleanIfNecessary(0L, null);
+
+      assertFalse(backend.getPartitionBucketCaches().containsKey("old"));
+      assertTrue(backend.getPartitionBucketCaches().containsKey("recent"));
+    }
+  }
+
+  @Test
+  public void testGetDoesNotRefreshLastUpdatedCheckpoint() throws Exception {
+    try (RecordLevelIndexBackend backend = createBackend()) {
+      backend.getPartitionBucketCaches().put("old", cacheWithHeapSize(backend, 
2 * ONE_MB, 1L));
+      backend.onCheckpoint(2L);
+
+      backend.get("old", "key1");
+      backend.onCheckpointComplete(new 
TestCorrespondent(Collections.emptyMap()), 2L);
+      backend.cleanIfNecessary(0L, null);
+
+      assertFalse(backend.getPartitionBucketCaches().containsKey("old"));
+    }
+  }
+
+  @Test
+  public void testPartitionCacheDictionaryEncodesFileGroupId() throws 
Exception {
+    try (RecordLevelIndexBackend backend = createBackend()) {
+      ExternalSpillableMap<String, Integer> recordKeyToFileGroupIdCode = 
mapWithStorage(0L);
+      RecordLevelIndexBackend.BucketCache cache = 
backend.newBucketCache(recordKeyToFileGroupIdCode, 1L);
+
+      cache.putRecordKey("key1", "file-group-id-000000000000000000000001");
+      cache.putRecordKey("key2", "file-group-id-000000000000000000000001");
+      cache.putRecordKey("key3", "file-group-id-000000000000000000000002");
+
+      assertEquals("file-group-id-000000000000000000000001", 
cache.getFileGroupId("key1"));
+      assertEquals("file-group-id-000000000000000000000001", 
cache.getFileGroupId("key2"));
+      assertEquals("file-group-id-000000000000000000000002", 
cache.getFileGroupId("key3"));
+      assertEquals(Integer.valueOf(0), recordKeyToFileGroupIdCode.get("key1"));
+      assertEquals(Integer.valueOf(0), recordKeyToFileGroupIdCode.get("key2"));
+      assertEquals(Integer.valueOf(1), recordKeyToFileGroupIdCode.get("key3"));
+    }
+  }
+
+  @Test
+  public void testLazyEvictUsesAccessOrder() throws Exception {
+    try (RecordLevelIndexBackend backend = createBackend()) {
+      backend.getPartitionBucketCaches().put("par1", 
cacheWithHeapSize(backend, 500 * 1024, 1L));
+      backend.getPartitionBucketCaches().put("par2", 
cacheWithHeapSize(backend, 500 * 1024, 1L));
+      backend.getPartitionBucketCaches().get("par1");
+      backend.getPartitionBucketCaches().put("par3", 
cacheWithHeapSize(backend, 100 * 1024, 2L));
+      backend.onCheckpointComplete(new 
TestCorrespondent(Collections.emptyMap()), 2L);
+
+      backend.cleanIfNecessary(0L, "par3");
+
+      assertTrue(backend.getPartitionBucketCaches().containsKey("par1"));
+      assertFalse(backend.getPartitionBucketCaches().containsKey("par2"));
+      assertTrue(backend.getPartitionBucketCaches().containsKey("par3"));
+    }
+  }
+
+  private RecordLevelIndexBackend createBackend() {
+    return new RecordLevelIndexBackend(conf, (partitionPath, recordKey, 
fileId) -> true);
+  }
+
+  private RecordLevelIndexBackend.BucketCache cacheWithHeapSize(
+      RecordLevelIndexBackend indexBackend,
+      long heapSize,
+      long lastUpdatedCheckpoint) {
+    return indexBackend.newBucketCache(mapWithStorage(heapSize), 
lastUpdatedCheckpoint);
+  }
+
+  private ExternalSpillableMap<String, Integer> mapWithStorage(long heapSize) {
+    Map<String, Integer> storage = new HashMap<>();
+    ExternalSpillableMap<String, Integer> recordKeyToFileGroupIdCode = 
Mockito.mock(ExternalSpillableMap.class);
+    
when(recordKeyToFileGroupIdCode.getCurrentInMemoryMapSize()).thenReturn(heapSize);
+    when(recordKeyToFileGroupIdCode.size()).thenAnswer(invocation -> 
storage.size());
+    when(recordKeyToFileGroupIdCode.get(Mockito.anyString()))
+        .thenAnswer(invocation -> storage.get(invocation.getArgument(0)));
+    doAnswer(invocation -> storage.put(invocation.getArgument(0), 
invocation.getArgument(1)))
+        .when(recordKeyToFileGroupIdCode).put(Mockito.anyString(), 
Mockito.anyInt());
+    return recordKeyToFileGroupIdCode;
+  }
+
+  private static class TestCorrespondent extends Correspondent {
+    private final Map<Long, String> inflightInstants;
+
+    TestCorrespondent(Map<Long, String> inflightInstants) {
+      this.inflightInstants = inflightInstants;
+    }
 
-    try (RecordLevelIndexBackend recordLevelIndexBackend = new 
RecordLevelIndexBackend(conf, -1)) {
-
-      for (int i = 0; i < 1500; i++) {
-        recordLevelIndexBackend.update("id1_" + i,
-            new HoodieRecordGlobalLocation("par1", "000000001", 
UUID.randomUUID().toString(), -1));
-      }
-      // new checkpoint
-      recordLevelIndexBackend.onCheckpoint(1);
-      Correspondent correspondent = mock(Correspondent.class);
-      Map<Long, String> inflightInstants = new HashMap<>();
-      inflightInstants.put(1L, "0001");
-      
when(correspondent.requestInflightInstants()).thenReturn(inflightInstants);
-      recordLevelIndexBackend.onCheckpointComplete(correspondent, 1);
-
-      for (int i = 0; i < 2000; i++) {
-        recordLevelIndexBackend.update("id2_" + i,
-            new HoodieRecordGlobalLocation("par1", "000000001", 
UUID.randomUUID().toString(), -1));
-      }
-
-      // new checkpoint
-      recordLevelIndexBackend.onCheckpoint(2);
-      correspondent = mock(Correspondent.class);
-      inflightInstants = new HashMap<>();
-      inflightInstants.put(2L, "0002");
-      
when(correspondent.requestInflightInstants()).thenReturn(inflightInstants);
-      recordLevelIndexBackend.onCheckpointComplete(correspondent, 2);
-
-      for (int i = 0; i < 2000; i++) {
-        recordLevelIndexBackend.update("id3_" + i,
-            new HoodieRecordGlobalLocation("par1", "000000001", 
UUID.randomUUID().toString(), -1));
-      }
-
-      // the cache for the first instant is evicted
-      assertEquals(2, 
recordLevelIndexBackend.getRecordIndexCache().getCaches().size());
-
-      // new checkpoint
-      recordLevelIndexBackend.onCheckpoint(3);
-      correspondent = mock(Correspondent.class);
-      inflightInstants = new HashMap<>();
-      inflightInstants.put(3L, "0003");
-      
when(correspondent.requestInflightInstants()).thenReturn(inflightInstants);
-      recordLevelIndexBackend.onCheckpointComplete(correspondent, 3);
-
-      for (int i = 0; i < 500; i++) {
-        recordLevelIndexBackend.update("id4_" + i,
-            new HoodieRecordGlobalLocation("par1", "000000001", 
UUID.randomUUID().toString(), -1));
-      }
-
-      assertEquals(3, 
recordLevelIndexBackend.getRecordIndexCache().getCaches().size());
-
-      // insert another batch of records, which will trigger the cleaning of 
the cache.
-      // another cache clean is triggered
-      for (int i = 500; i < 1500; i++) {
-        recordLevelIndexBackend.update("id4_" + i,
-            new HoodieRecordGlobalLocation("par1", "000000001", 
UUID.randomUUID().toString(), -1));
-      }
-      assertEquals(3, 
recordLevelIndexBackend.getRecordIndexCache().getCaches().size());
-      // cache for the oldest ckp id will be cleaned
-      
assertNull(recordLevelIndexBackend.getRecordIndexCache().getCaches().get(-1L));
-      // caches for the latest 3 ckp id still in the cache
-      assertEquals("par1", 
recordLevelIndexBackend.get("id2_0").getPartitionPath());
-      assertEquals("par1", 
recordLevelIndexBackend.get("id3_0").getPartitionPath());
-      assertEquals("par1", 
recordLevelIndexBackend.get("id4_0").getPartitionPath());
+    @Override
+    public Map<Long, String> requestInflightInstants() {
+      return inflightInstants;
     }
   }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
index 5318e4381bc3..a383d66dbaec 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
@@ -36,7 +36,7 @@ import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.sink.event.CommitAckEvent;
 import org.apache.hudi.sink.event.WriteMetadataEvent;
 import org.apache.hudi.sink.partitioner.index.IndexBackend;
-import org.apache.hudi.sink.partitioner.index.RecordLevelIndexBackend;
+import org.apache.hudi.sink.partitioner.index.GlobalRecordLevelIndexBackend;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.util.StreamerUtil;
 import org.apache.hudi.utils.TestConfigurations;
@@ -366,8 +366,8 @@ public class TestWriteBase {
      */
     public TestHarness assertInflightCachesOfBucketAssigner(int expected) {
       IndexBackend indexBackend = 
pipeline.getBucketAssignFunction().getIndexBackend();
-      if (indexBackend instanceof RecordLevelIndexBackend) {
-        assertEquals(expected, ((RecordLevelIndexBackend) 
indexBackend).getRecordIndexCache().getCaches().size());
+      if (indexBackend instanceof GlobalRecordLevelIndexBackend) {
+        assertEquals(expected, ((GlobalRecordLevelIndexBackend) 
indexBackend).getRecordIndexCache().getCaches().size());
       }
       return this;
     }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestDynamicBucketStreamWrite.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestDynamicBucketStreamWrite.java
new file mode 100644
index 000000000000..83b5b7f73cdf
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestDynamicBucketStreamWrite.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.util.StreamerUtil;
+import org.apache.hudi.utils.FlinkMiniCluster;
+import org.apache.hudi.utils.TestConfigurations;
+import org.apache.hudi.utils.TestData;
+import org.apache.hudi.utils.TestSQL;
+import org.apache.hudi.utils.TestTableEnvs;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CollectionUtil;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.hudi.utils.TestConfigurations.sql;
+import static org.apache.hudi.utils.TestData.assertRowsEquals;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Integration tests for Flink streaming writes with dynamic bucket index.
+ */
+@ExtendWith(FlinkMiniCluster.class)
+public class ITTestDynamicBucketStreamWrite {
+  private TableEnvironment streamTableEnv;
+  private TableEnvironment batchTableEnv;
+
+  @TempDir
+  File tempFile;
+
+  @BeforeEach
+  void beforeEach() {
+    EnvironmentSettings settings = EnvironmentSettings.newInstance().build();
+    streamTableEnv = TableEnvironmentImpl.create(settings);
+    streamTableEnv.getConfig().getConfiguration()
+        .set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 
4);
+    Configuration execConf = streamTableEnv.getConfig().getConfiguration();
+    execConf.setString("execution.checkpointing.interval", "2s");
+    execConf.setString("restart-strategy", "fixed-delay");
+    execConf.setString("restart-strategy.fixed-delay.attempts", "0");
+
+    batchTableEnv = TestTableEnvs.getBatchTableEnv();
+    batchTableEnv.getConfig().getConfiguration()
+        .set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 
4);
+  }
+
+  @ParameterizedTest
+  @MethodSource("tableTypeAndBooleanTrueFalseParams")
+  void testWriteAndBootstrapFromRecordIndex(HoodieTableType tableType, boolean 
partitioned) {
+    streamTableEnv.executeSql(getTableDDL("t1", tableType, partitioned));
+
+    final String insertInto1 = "insert into t1 values\n"
+        + "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),\n"
+        + "('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),\n"
+        + "('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),\n"
+        + "('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2')";
+    execInsertSql(streamTableEnv, insertInto1);
+
+    Map<String, String> initialFileIds = collectFileIds(streamTableEnv, "uuid 
in ('id1', 'id2')");
+    assertThat(initialFileIds).containsOnlyKeys("id1", "id2");
+    assertRecordIndexMetadataPartitionExists();
+
+    final String insertInto2 = "insert into t1 values\n"
+        + "('id1','Danny',24,TIMESTAMP '1970-01-01 00:00:05','par1'),\n"
+        + "('id2','Stephen',34,TIMESTAMP '1970-01-01 00:00:06','par1'),\n"
+        + "('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:07','par2')";
+    execInsertSql(streamTableEnv, insertInto2);
+
+    List<Row> result = execSelectSql(streamTableEnv, "select uuid, name, age, 
ts, `partition` from t1");
+    assertRowsEquals(result, "["
+        + "+I[id1, Danny, 24, 1970-01-01T00:00:05, par1], "
+        + "+I[id2, Stephen, 34, 1970-01-01T00:00:06, par1], "
+        + "+I[id3, Julian, 53, 1970-01-01T00:00:03, par2], "
+        + "+I[id4, Fabian, 31, 1970-01-01T00:00:04, par2], "
+        + "+I[id5, Sophia, 18, 1970-01-01T00:00:07, par2]]");
+
+    Map<String, String> updatedFileIds = collectFileIds(streamTableEnv, "uuid 
in ('id1', 'id2')");
+    assertEquals(initialFileIds, updatedFileIds);
+  }
+
+  @ParameterizedTest
+  @EnumSource(value = HoodieTableType.class)
+  void testInsertOverwrite(HoodieTableType tableType) {
+    batchTableEnv.executeSql(getTableDDL("t1", tableType));
+    execInsertSql(batchTableEnv, TestSQL.INSERT_T1);
+    String selectPhysicalColumns = "select uuid, name, age, ts, `partition` 
from t1";
+
+    final String insertOverwritePartition = "insert overwrite t1 
partition(`partition`='par1') values\n"
+        + "('id1','Danny',24,TIMESTAMP '1970-01-01 00:00:01'),\n"
+        + "('id2','Stephen',34,TIMESTAMP '1970-01-01 00:00:02')\n";
+    execInsertSql(batchTableEnv, insertOverwritePartition);
+    assertRowsEquals(execSelectSql(batchTableEnv, selectPhysicalColumns), 
TestData.DATA_SET_SOURCE_INSERT_OVERWRITE);
+
+    execInsertSql(batchTableEnv, insertOverwritePartition);
+    assertRowsEquals(execSelectSql(batchTableEnv, selectPhysicalColumns), 
TestData.DATA_SET_SOURCE_INSERT_OVERWRITE);
+
+    final String insertOverwriteDynamicPartition = "insert overwrite t1 /*+ 
OPTIONS('write.partition.overwrite.mode'='dynamic') */ values\n"
+        + "('id1','Danny',24,TIMESTAMP '1970-01-01 00:00:01', 'par1'),\n"
+        + "('id2','Stephen',34,TIMESTAMP '1970-01-01 00:00:02', 'par2')\n";
+    execInsertSql(batchTableEnv, insertOverwriteDynamicPartition);
+    assertRowsEquals(execSelectSql(batchTableEnv, selectPhysicalColumns), 
TestData.DATA_SET_SOURCE_INSERT_OVERWRITE_DYNAMIC_PARTITION);
+
+    execInsertSql(batchTableEnv, insertOverwriteDynamicPartition);
+    assertRowsEquals(execSelectSql(batchTableEnv, selectPhysicalColumns), 
TestData.DATA_SET_SOURCE_INSERT_OVERWRITE_DYNAMIC_PARTITION);
+
+    final String insertOverwriteTable = "insert overwrite t1 values\n"
+        + "('id1','Danny',24,TIMESTAMP '1970-01-01 00:00:01', 'par1'),\n"
+        + "('id2','Stephen',34,TIMESTAMP '1970-01-01 00:00:02', 'par2')\n";
+    final String expected = "["
+        + "+I[id1, Danny, 24, 1970-01-01T00:00:01, par1], "
+        + "+I[id2, Stephen, 34, 1970-01-01T00:00:02, par2]]";
+    execInsertSql(batchTableEnv, insertOverwriteTable);
+    assertRowsEquals(execSelectSql(batchTableEnv, selectPhysicalColumns), 
expected);
+
+    execInsertSql(batchTableEnv, insertOverwriteTable);
+    assertRowsEquals(execSelectSql(batchTableEnv, selectPhysicalColumns), 
expected);
+  }
+
+  @ParameterizedTest
+  @EnumSource(value = HoodieTableType.class)
+  void testBucketScalesUpWithContinuousWrites(HoodieTableType tableType) {
+    streamTableEnv.executeSql(getTableDDL(
+        "t1", tableType, 
Collections.singletonMap(HoodieCompactionConfig.COPY_ON_WRITE_INSERT_SPLIT_SIZE.key(),
 "1"), true));
+
+    execInsertSql(streamTableEnv, "insert into t1 values\n"
+        + "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par_scale'),\n"
+        + "('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par_scale'),\n"
+        + "('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par_scale'),\n"
+        + "('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par_scale')");
+
+    execInsertSql(streamTableEnv, "insert into t1 values\n"
+        + "('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par_scale'),\n"
+        + "('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par_scale'),\n"
+        + "('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par_scale'),\n"
+        + "('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par_scale')");
+
+    Map<String, String> fileIdsByRecordKey = collectFileIds(streamTableEnv, 
"`partition` = 'par_scale'");
+    assertThat(fileIdsByRecordKey).hasSize(8);
+    Set<String> fileIds = new HashSet<>(fileIdsByRecordKey.values());
+    assertThat(fileIds).hasSizeGreaterThan(2);
+  }
+
+  /**
+   * Return test params => (HoodieTableType, true/false).
+   */
+  private static Stream<Arguments> tableTypeAndBooleanTrueFalseParams() {
+    Object[][] data =
+        new Object[][] {
+            {HoodieTableType.COPY_ON_WRITE, false},
+            {HoodieTableType.COPY_ON_WRITE, true},
+            {HoodieTableType.MERGE_ON_READ, false},
+            {HoodieTableType.MERGE_ON_READ, true}};
+    return Stream.of(data).map(Arguments::of);
+  }
+
+  private String getTableDDL(String tableName, HoodieTableType tableType) {
+    return getTableDDL(tableName, tableType, Collections.emptyMap(), true);
+  }
+
+  private String getTableDDL(String tableName, HoodieTableType tableType, 
boolean partitioned) {
+    return getTableDDL(tableName, tableType, Collections.emptyMap(), 
partitioned);
+  }
+
+  private String getTableDDL(String tableName, HoodieTableType tableType, 
Map<String, String> extraOptions, boolean partitioned) {
+    TestConfigurations.Sql ddl = sql(tableName)
+        .field("_hoodie_file_name STRING METADATA VIRTUAL")
+        .field("uuid varchar(20)")
+        .field("name varchar(10)")
+        .field("age int")
+        .field("ts timestamp(3)")
+        .field("`partition` varchar(20)")
+        .options(getDefaultKeys())
+        .options(extraOptions)
+        .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .option(FlinkOptions.TABLE_TYPE, tableType)
+        .option(FlinkOptions.INDEX_TYPE, 
HoodieIndex.IndexType.RECORD_LEVEL_INDEX.name())
+        .option(FlinkOptions.BUCKET_ASSIGN_TASKS, 2)
+        .option(FlinkOptions.WRITE_TASKS, 2)
+        .option(FlinkOptions.INDEX_WRITE_TASKS, 2);
+    if (!partitioned) {
+      ddl.noPartition();
+    }
+    return ddl.end();
+  }
+
+  private static Map<String, String> getDefaultKeys() {
+    Configuration conf = new Configuration();
+    conf.set(FlinkOptions.RECORD_KEY_FIELD, "uuid");
+    conf.set(FlinkOptions.ORDERING_FIELDS, "ts");
+    return conf.toMap();
+  }
+
+  private void assertRecordIndexMetadataPartitionExists() {
+    org.apache.hadoop.conf.Configuration hadoopConf = new 
org.apache.hadoop.conf.Configuration();
+    String mdtBasePath = 
HoodieTableMetadata.getMetadataTableBasePath(tempFile.getAbsolutePath());
+    assertTrue(StreamerUtil.tableExists(mdtBasePath, hadoopConf),
+        "Metadata table should exist for table with dynamic bucket index");
+    assertTrue(StreamerUtil.partitionExists(mdtBasePath, 
MetadataPartitionType.RECORD_INDEX.getPartitionPath(), hadoopConf),
+        "RECORD_INDEX partition should exist for table with dynamic bucket 
index");
+  }
+
+  private Map<String, String> collectFileIds(TableEnvironment tableEnv, String 
condition) {
+    List<Row> rows = execSelectSql(tableEnv, "select uuid, _hoodie_file_name 
from t1 where " + condition);
+    return rows.stream().collect(Collectors.toMap(
+        row -> row.getField(0).toString(),
+        row -> FSUtils.getFileId(row.getField(1).toString())));
+  }
+
+  private List<Row> execSelectSql(TableEnvironment tableEnv, String select) {
+    return CollectionUtil.iterableToList(
+        () -> tableEnv.sqlQuery(select).execute().collect());
+  }
+
+  private void execInsertSql(TableEnvironment tableEnv, String insert) {
+    TableResult tableResult = tableEnv.executeSql(insert);
+    try {
+      tableResult.await();
+    } catch (InterruptedException | ExecutionException ex) {
+      throw new AssertionError("Failed to execute insert SQL: " + insert, ex);
+    }
+  }
+}


Reply via email to