This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 5de762f6b2bc feat(flink): Support data skipping based on partitioned 
RLI (#19006)
5de762f6b2bc is described below

commit 5de762f6b2bc83a09fb82bade67554165f4d3392
Author: Shuo Cheng <[email protected]>
AuthorDate: Thu Jun 18 14:23:18 2026 +0800

    feat(flink): Support data skipping based on partitioned RLI (#19006)
---
 .../java/org/apache/hudi/source/FileIndex.java     |   8 +-
 ...rdLevelIndex.java => BaseRecordLevelIndex.java} |  64 +++--
 .../hudi/source/stats/GlobalRecordLevelIndex.java  |  54 ++++
 .../apache/hudi/source/stats/RecordLevelIndex.java | 310 +++++++--------------
 .../java/org/apache/hudi/source/TestFileIndex.java |  67 +++++
 .../hudi/source/stats/TestRecordLevelIndex.java    | 221 ++++++++++++++-
 .../apache/hudi/table/ITTestHoodieDataSource.java  |  48 ++++
 .../test/java/org/apache/hudi/utils/TestData.java  |  11 +
 8 files changed, 534 insertions(+), 249 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java
index 75d00ff136e8..23aa83b3ba40 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java
@@ -29,8 +29,8 @@ import org.apache.hudi.configuration.HadoopConfigurations;
 import org.apache.hudi.index.bucket.BucketIdentifier;
 import org.apache.hudi.source.prune.ColumnStatsProbe;
 import org.apache.hudi.source.prune.PartitionPruners;
+import org.apache.hudi.source.stats.BaseRecordLevelIndex;
 import org.apache.hudi.source.stats.FileStatsIndex;
-import org.apache.hudi.source.stats.RecordLevelIndex;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.StoragePathInfo;
 import org.apache.hudi.util.StreamerUtil;
@@ -73,7 +73,7 @@ public class FileIndex implements Serializable, AutoCloseable 
{
   private final Function<String, Integer> partitionBucketIdFunc;   // for 
bucket pruning
   private List<String> partitionPaths;                             // cache of 
partition paths
   private final FileStatsIndex fileStatsIndex;                     // for data 
skipping
-  private final Option<RecordLevelIndex> recordLevelIndex;
+  private final Option<BaseRecordLevelIndex> recordLevelIndex;
   private final HoodieTableMetaClient metaClient;
 
   private FileIndex(
@@ -93,7 +93,7 @@ public class FileIndex implements Serializable, AutoCloseable 
{
     this.fileStatsIndex = new FileStatsIndex(path.toString(), rowType, conf, 
metaClient);
     this.partitionBucketIdFunc = partitionBucketIdFunc;
     List<ExpressionEvaluators.Evaluator> evaluators = 
Option.ofNullable(colStatsProbe).map(ColumnStatsProbe::getEvaluators).orElse(Collections.emptyList());
-    this.recordLevelIndex = RecordLevelIndex.create(path.toString(), conf, 
metaClient, evaluators, rowType);
+    this.recordLevelIndex = BaseRecordLevelIndex.create(path.toString(), conf, 
metaClient, evaluators, rowType);
     this.metaClient = metaClient;
   }
 
@@ -288,7 +288,7 @@ public class FileIndex implements Serializable, 
AutoCloseable {
   @Override
   public void close() {
     this.fileStatsIndex.close();
-    this.recordLevelIndex.ifPresent(RecordLevelIndex::close);
+    this.recordLevelIndex.ifPresent(BaseRecordLevelIndex::close);
     this.partitionPruner.ifPresent(PartitionPruners.PartitionPruner::close);
   }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/RecordLevelIndex.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/BaseRecordLevelIndex.java
similarity index 84%
copy from 
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/RecordLevelIndex.java
copy to 
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/BaseRecordLevelIndex.java
index 67e20e9e2282..beb76b02f525 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/RecordLevelIndex.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/BaseRecordLevelIndex.java
@@ -19,9 +19,10 @@
 package org.apache.hudi.source.stats;
 
 import org.apache.hudi.client.common.HoodieFlinkEngineContext;
-import org.apache.hudi.common.data.HoodieListData;
 import org.apache.hudi.common.data.HoodiePairData;
 import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieIndexDefinition;
 import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.HoodieDataUtils;
@@ -31,6 +32,7 @@ import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.index.record.HoodieRecordIndex;
 import org.apache.hudi.keygen.KeyGenUtils;
 import org.apache.hudi.keygen.KeyGenerator;
 import org.apache.hudi.metadata.HoodieTableMetadata;
@@ -45,8 +47,6 @@ import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.types.logical.DecimalType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.math.BigDecimal;
 import java.util.ArrayList;
@@ -58,20 +58,20 @@ import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
- * An index support implementation that leverages Record Level Index to prune 
file slices.
+ * Base index support that leverages Record Level Index to prune file slices.
  */
 @Slf4j
-public class RecordLevelIndex implements FlinkMetadataIndex {
+public abstract class BaseRecordLevelIndex implements FlinkMetadataIndex {
   private static final long serialVersionUID = 1L;
-  private static final Logger LOG = 
LoggerFactory.getLogger(RecordLevelIndex.class);
 
   private final String basePath;
-  private final Configuration conf;
-  private final List<String> hoodieKeysFromFilter;
+  protected final Configuration conf;
+  protected final List<String> hoodieKeysFromFilter;
   private final HoodieTableMetaClient metaClient;
   private HoodieTableMetadata metadataTable;
 
-  private RecordLevelIndex(
+  @VisibleForTesting
+  BaseRecordLevelIndex(
       String basePath,
       Configuration conf,
       HoodieTableMetaClient metaClient,
@@ -109,24 +109,32 @@ public class RecordLevelIndex implements 
FlinkMetadataIndex {
     if (!isIndexAvailable()) {
       return fileSlices;
     }
-    HoodiePairData<String, HoodieRecordGlobalLocation> recordIndexData = null;
+
     try {
-      recordIndexData =
-          
getMetadataTable().readRecordIndexLocationsWithKeys(HoodieListData.eager(hoodieKeysFromFilter));
-      List<Pair<String, HoodieRecordGlobalLocation>> recordIndexLocations = 
HoodieDataUtils.dedupeAndCollectAsList(recordIndexData);
-      Set<String> fileIds = recordIndexLocations.stream()
-          .map(pair -> 
pair.getValue().getFileId()).collect(Collectors.toSet());
-      return fileSlices.stream().filter(fileSlice -> 
fileIds.contains(fileSlice.getFileId())).collect(Collectors.toList());
+      Option<Set<HoodieFileGroupId>> candidateFileGroupIds =
+          lookupCandidateFileGroupIds(fileSlices);
+      return candidateFileGroupIds.map(candidates -> fileSlices.stream()
+          .filter(fileSlice -> candidates.contains(fileSlice.getFileGroupId()))
+          .collect(Collectors.toList()))
+          .orElse(fileSlices);
     } catch (Throwable e) {
       log.error("Failed to read metadata index: {} for data skipping", 
getIndexPartitionName(), e);
       return fileSlices;
-    } finally {
-      // Clean up the RDD to avoid memory leaks
-      
Option.ofNullable(recordIndexData).ifPresent(HoodiePairData::unpersistWithDependencies);
     }
   }
 
-  public static Option<RecordLevelIndex> create(
+  protected abstract Option<Set<HoodieFileGroupId>> 
lookupCandidateFileGroupIds(List<FileSlice> fileSlices);
+
+  protected static Set<HoodieFileGroupId> getFileGroupIds(
+      HoodiePairData<String, HoodieRecordGlobalLocation> recordIndexData) {
+    List<Pair<String, HoodieRecordGlobalLocation>> recordIndexLocations =
+        HoodieDataUtils.dedupeAndCollectAsList(recordIndexData);
+    return recordIndexLocations.stream()
+        .map(pair -> new HoodieFileGroupId(pair.getValue().getPartitionPath(), 
pair.getValue().getFileId()))
+        .collect(Collectors.toSet());
+  }
+
+  public static Option<BaseRecordLevelIndex> create(
       String basePath,
       Configuration conf,
       HoodieTableMetaClient metaClient,
@@ -145,22 +153,28 @@ public class RecordLevelIndex implements 
FlinkMetadataIndex {
 
     String[] recordKeyFields = 
metaClient.getTableConfig().getRecordKeyFields().orElse(new String[0]);
     if (recordKeyFields.length == 0) {
-      LOG.warn("The table do not have record keys, skipping the rli pruning.");
+      log.warn("The table do not have record keys, skipping the rli pruning.");
       return Option.empty();
     }
     boolean consistentLogicalTimestampEnabled = 
OptionsResolver.isConsistentLogicalTimestampEnabled(conf);
     List<String> hoodieKeysFromFilter = computeHoodieKeyFromFilters(conf, 
metaClient, evaluators, recordKeyFields, rowType, 
consistentLogicalTimestampEnabled);
     if (hoodieKeysFromFilter.isEmpty()) {
-      LOG.warn("The number of keys from query predicate is empty, skipping the 
rli pruning.");
+      log.warn("The number of keys from query predicate is empty, skipping the 
rli pruning.");
       return Option.empty();
     }
     int maxKeyNum = conf.get(FlinkOptions.READ_DATA_SKIPPING_RLI_KEYS_MAX_NUM);
     if (hoodieKeysFromFilter.size() > maxKeyNum) {
-      LOG.warn("The number of keys from query predicate: {} exceeds the upper 
threshold: {}, skipping the rli pruning, the keys: {}",
+      log.warn("The number of keys from query predicate: {} exceeds the upper 
threshold: {}, skipping the rli pruning, the keys: {}",
           hoodieKeysFromFilter.size(), maxKeyNum, hoodieKeysFromFilter);
       return Option.empty();
     }
-    return Option.of(new RecordLevelIndex(basePath, conf, metaClient, 
hoodieKeysFromFilter));
+    HoodieIndexDefinition indexDefinition = 
metaClient.getIndexForMetadataPartition(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX).orElse(null);
+    if (indexDefinition == null) {
+      return Option.empty();
+    }
+    return Option.of(HoodieRecordIndex.isPartitioned(indexDefinition)
+        ? new RecordLevelIndex(basePath, conf, metaClient, 
hoodieKeysFromFilter)
+        : new GlobalRecordLevelIndex(basePath, conf, metaClient, 
hoodieKeysFromFilter));
   }
 
   /**
@@ -196,7 +210,7 @@ public class RecordLevelIndex implements FlinkMetadataIndex 
{
             : normalizeLiteral(val, keyField, fieldType, 
consistentLogicalTimestampEnabled)));
       }
       if (recordKeys.isEmpty()) {
-        LOG.info("No literals found for the record key: {}, therefore 
filtering can not be performed", keyField);
+        log.info("No literals found for the record key: {}, therefore 
filtering can not be performed", keyField);
         return Collections.emptyList();
       } else if (!isComplexRecordKey || hoodieKeys.isEmpty()) {
         hoodieKeys = recordKeys;
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/GlobalRecordLevelIndex.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/GlobalRecordLevelIndex.java
new file mode 100644
index 000000000000..9f93f8757e2c
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/GlobalRecordLevelIndex.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.stats;
+
+import org.apache.hudi.common.data.HoodieListData;
+import org.apache.hudi.common.data.HoodiePairData;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+
+import org.apache.flink.configuration.Configuration;
+
+import java.util.List;
+import java.util.Set;
+
+/**
+ * An index support implementation that leverages global Record Level Index to 
prune file slices.
+ */
+public class GlobalRecordLevelIndex extends BaseRecordLevelIndex {
+  private static final long serialVersionUID = 1L;
+
+  GlobalRecordLevelIndex(
+      String basePath,
+      Configuration conf,
+      HoodieTableMetaClient metaClient,
+      List<String> hoodieKeysFromFilter) {
+    super(basePath, conf, metaClient, hoodieKeysFromFilter);
+  }
+
+  @Override
+  protected Option<Set<HoodieFileGroupId>> 
lookupCandidateFileGroupIds(List<FileSlice> fileSlices) {
+    HoodiePairData<String, HoodieRecordGlobalLocation> recordIndexData =
+        
getMetadataTable().readRecordIndexLocationsWithKeys(HoodieListData.eager(hoodieKeysFromFilter));
+    return Option.of(getFileGroupIds(recordIndexData));
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/RecordLevelIndex.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/RecordLevelIndex.java
index 67e20e9e2282..9263aaf5f9ac 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/RecordLevelIndex.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/RecordLevelIndex.java
@@ -22,262 +22,152 @@ import 
org.apache.hudi.client.common.HoodieFlinkEngineContext;
 import org.apache.hudi.common.data.HoodieListData;
 import org.apache.hudi.common.data.HoodiePairData;
 import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.util.HoodieDataUtils;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.VisibleForTesting;
-import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.configuration.FlinkOptions;
-import org.apache.hudi.configuration.OptionsResolver;
-import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.keygen.KeyGenUtils;
-import org.apache.hudi.keygen.KeyGenerator;
+import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.metadata.HoodieTableMetadataUtil;
-import org.apache.hudi.sink.bulk.RowDataKeyGen;
-import org.apache.hudi.source.ExpressionEvaluators;
-import org.apache.hudi.util.StreamerUtil;
+import org.apache.hudi.metadata.MetadataPartitionType;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.data.TimestampData;
-import org.apache.flink.table.types.logical.DecimalType;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.RowType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import java.math.BigDecimal;
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
-import java.util.Objects;
+import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
- * An index support implementation that leverages Record Level Index to prune 
file slices.
+ * An index support implementation that leverages partitioned Record Level 
Index to prune file slices.
+ *
+ * <p>Unlike the global RLI, record keys in partitioned RLI are only unique 
within a data-table partition.
+ * This implementation therefore scopes metadata-table lookups by candidate 
data partition. To avoid reading
+ * every RLI shard for each candidate partition, it first groups 
filter-derived record keys by the shard they
+ * hash to within each partition, then performs one metadata lookup per {@code 
(partition, shard)} group.
+ *
+ * <p>The grouped lookups use bounded local parallelism through {@link 
HoodieFlinkEngineContext#map(List,
+ * org.apache.hudi.common.function.SerializableFunction, int)}. This is 
intentionally not implemented with
+ * {@code HoodieFlinkEngineContext#parallelize}, because the Flink context 
backs {@code parallelize} with
+ * in-memory {@code HoodieListData}, not distributed Flink tasks.
  */
 @Slf4j
-public class RecordLevelIndex implements FlinkMetadataIndex {
+public class RecordLevelIndex extends BaseRecordLevelIndex {
   private static final long serialVersionUID = 1L;
-  private static final Logger LOG = 
LoggerFactory.getLogger(RecordLevelIndex.class);
 
-  private final String basePath;
-  private final Configuration conf;
-  private final List<String> hoodieKeysFromFilter;
-  private final HoodieTableMetaClient metaClient;
-  private HoodieTableMetadata metadataTable;
+  /**
+   * Upper bound on the number of candidate data-table partitions eligible for 
a partitioned RLI lookup.
+   *
+   * <p>Unlike the global RLI (a single lookup over all keys), the partitioned 
variant performs one metadata-table
+   * read per candidate partition. When a query does not filter on the 
partition column the candidate set can span
+   * many partitions, and fanning out a lookup to each one can add latency 
that outweighs the skipping benefit.
+   * Once the candidate partition count exceeds this threshold, pruning is 
skipped.
+   */
+  private static final int MAX_PARTITIONS = 3;
 
-  private RecordLevelIndex(
+  RecordLevelIndex(
       String basePath,
       Configuration conf,
       HoodieTableMetaClient metaClient,
       List<String> hoodieKeysFromFilter) {
-    this.basePath = basePath;
-    this.conf = conf;
-    this.metaClient = metaClient;
-    this.hoodieKeysFromFilter = hoodieKeysFromFilter;
-  }
-
-  @Override
-  public String getIndexPartitionName() {
-    return HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX;
+    super(basePath, conf, metaClient, hoodieKeysFromFilter);
   }
 
+  /**
+   * Finds candidate data file groups by querying partitioned RLI for the 
record keys extracted from filters.
+   *
+   * <p>The method fails open through {@link 
BaseRecordLevelIndex#computeCandidateFileSlices(List)} if metadata
+   * lookup fails. Returning {@link Option#empty()} is reserved for cases 
where pruning should be skipped without
+   * treating it as an error, such as exceeding the configured candidate 
partition threshold.
+   */
   @Override
-  public boolean isIndexAvailable() {
-    return metaClient.getTableConfig().isMetadataTableAvailable()
-        && 
metaClient.getTableConfig().getMetadataPartitions().contains(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX);
-  }
-
-  public HoodieTableMetadata getMetadataTable() {
-    // initialize the metadata table lazily
-    if (this.metadataTable == null) {
-      this.metadataTable = 
metaClient.getTableFormat().getMetadataFactory().create(
-          HoodieFlinkEngineContext.DEFAULT,
-          metaClient.getStorage(),
-          StreamerUtil.metadataConfig(conf),
-          basePath);
-    }
-    return this.metadataTable;
-  }
-
-  public List<FileSlice> computeCandidateFileSlices(List<FileSlice> 
fileSlices) {
-    if (!isIndexAvailable()) {
-      return fileSlices;
-    }
-    HoodiePairData<String, HoodieRecordGlobalLocation> recordIndexData = null;
-    try {
-      recordIndexData =
-          
getMetadataTable().readRecordIndexLocationsWithKeys(HoodieListData.eager(hoodieKeysFromFilter));
-      List<Pair<String, HoodieRecordGlobalLocation>> recordIndexLocations = 
HoodieDataUtils.dedupeAndCollectAsList(recordIndexData);
-      Set<String> fileIds = recordIndexLocations.stream()
-          .map(pair -> 
pair.getValue().getFileId()).collect(Collectors.toSet());
-      return fileSlices.stream().filter(fileSlice -> 
fileIds.contains(fileSlice.getFileId())).collect(Collectors.toList());
-    } catch (Throwable e) {
-      log.error("Failed to read metadata index: {} for data skipping", 
getIndexPartitionName(), e);
-      return fileSlices;
-    } finally {
-      // Clean up the RDD to avoid memory leaks
-      
Option.ofNullable(recordIndexData).ifPresent(HoodiePairData::unpersistWithDependencies);
-    }
-  }
-
-  public static Option<RecordLevelIndex> create(
-      String basePath,
-      Configuration conf,
-      HoodieTableMetaClient metaClient,
-      List<ExpressionEvaluators.Evaluator> evaluators,
-      RowType rowType) {
-    if (evaluators.isEmpty() || 
!FlinkOptions.QUERY_TYPE_SNAPSHOT.equalsIgnoreCase(conf.get(FlinkOptions.QUERY_TYPE)))
 {
+  protected Option<Set<HoodieFileGroupId>> 
lookupCandidateFileGroupIds(List<FileSlice> fileSlices) {
+    Set<String> partitions = fileSlices.stream()
+        .map(FileSlice::getPartitionPath)
+        .collect(Collectors.toSet());
+    if (partitions.isEmpty()) {
       return Option.empty();
     }
-    if (metaClient == null) {
-      metaClient = StreamerUtil.createMetaClient(conf);
-    }
-    // disallow RLI for new encoding with complex key gen when the table 
version is lower than NINE.
-    if 
(KeyGenUtils.mayUseNewEncodingForComplexKeyGen(metaClient.getTableConfig())) {
+    if (partitions.size() > MAX_PARTITIONS) {
+      log.info("The number of candidate partitions {} exceeds the partitioned 
record level index lookup threshold {}. Skipping pruning.",
+          partitions.size(), MAX_PARTITIONS);
       return Option.empty();
     }
 
-    String[] recordKeyFields = 
metaClient.getTableConfig().getRecordKeyFields().orElse(new String[0]);
-    if (recordKeyFields.length == 0) {
-      LOG.warn("The table do not have record keys, skipping the rli pruning.");
-      return Option.empty();
-    }
-    boolean consistentLogicalTimestampEnabled = 
OptionsResolver.isConsistentLogicalTimestampEnabled(conf);
-    List<String> hoodieKeysFromFilter = computeHoodieKeyFromFilters(conf, 
metaClient, evaluators, recordKeyFields, rowType, 
consistentLogicalTimestampEnabled);
-    if (hoodieKeysFromFilter.isEmpty()) {
-      LOG.warn("The number of keys from query predicate is empty, skipping the 
rli pruning.");
-      return Option.empty();
-    }
-    int maxKeyNum = conf.get(FlinkOptions.READ_DATA_SKIPPING_RLI_KEYS_MAX_NUM);
-    if (hoodieKeysFromFilter.size() > maxKeyNum) {
-      LOG.warn("The number of keys from query predicate: {} exceeds the upper 
threshold: {}, skipping the rli pruning, the keys: {}",
-          hoodieKeysFromFilter.size(), maxKeyNum, hoodieKeysFromFilter);
-      return Option.empty();
-    }
-    return Option.of(new RecordLevelIndex(basePath, conf, metaClient, 
hoodieKeysFromFilter));
+    Set<HoodieFileGroupId> fileGroupIds = new HashSet<>();
+    HoodieTableMetadata metadataTable = getMetadataTable();
+    Map<String, List<FileSlice>> fileGroupsByDataPartition =
+        
metadataTable.getBucketizedFileGroupsForPartitionedRLI(MetadataPartitionType.RECORD_INDEX);
+    // Build one lookup group per RLI shard within a data partition so each 
metadata-table lookup
+    // only needs to resolve keys targeting that shard.
+    List<PartitionShardKeys> lookupGroups = 
groupKeysByPartitionAndShard(partitions, fileGroupsByDataPartition)
+        .entrySet().stream()
+        .flatMap(partitionEntry -> partitionEntry.getValue().values().stream()
+            .map(keysInSingleShard -> new 
PartitionShardKeys(partitionEntry.getKey(), keysInSingleShard)))
+        .collect(Collectors.toList());
+    // HoodieFlinkEngineContext#parallelize returns in-memory HoodieListData; 
use map(...) for bounded
+    // local parallelism over the already bucketed (partition, shard) lookup 
groups.
+    HoodieFlinkEngineContext.DEFAULT.map(
+        lookupGroups,
+        lookupGroup -> lookupFileGroupIds(metadataTable, lookupGroup),
+        getLookupParallelism(lookupGroups.size()))
+        .forEach(fileGroupIds::addAll);
+    return Option.of(fileGroupIds);
   }
 
   /**
-   * Given query filters, it filters the EqualTo, IN and OR queries on record 
key columns and
-   * returns the list of record key literals present in the query, for example:
-   * <p>
-   * filter1: `key1` = 'val1', returns {"val1"}
-   * filter2: `key1` in ('val1', 'val2', 'val3'), returns {"val1", "vale", 
"val3"}
-   * filter3: `key1` = 'val1' OR `key1` = 'val2' or `key1` = 'val3', returns 
{"val1", "vale", "val3"}
-   * filter4: `key1` = 'val1' AND `key2` in ('val2', 'val3'), returns 
{"key1:val1,key2:val2", "key1:val1,key2:val3"}
+   * Groups filter-derived record keys by data-table partition and RLI shard.
+   *
+   * <p>The lookup API is partition-scoped. This helper additionally splits 
keys by RLI shard from the
+   * bucketized partitioned-RLI file groups returned by the metadata table.
    */
-  @VisibleForTesting
-  public static List<String> computeHoodieKeyFromFilters(
-      Configuration conf,
-      HoodieTableMetaClient metaClient,
-      List<ExpressionEvaluators.Evaluator> evaluators,
-      String[] keyFields,
-      RowType rowType,
-      boolean consistentLogicalTimestampEnabled) {
-    String[] partitionFields = 
metaClient.getTableConfig().getPartitionFields().orElse(new String[0]);
-    // align with the check logic in RowDataKeyGen
-    boolean isComplexRecordKey = keyFields.length > 1 || 
partitionFields.length > 1 && 
!OptionsResolver.useComplexKeygenNewEncoding(conf);
-    List<String> hoodieKeys = new ArrayList<>();
-    List<String> fieldNames = rowType.getFieldNames();
-    for (String keyField: keyFields) {
-      List<String> recordKeys = new ArrayList<>();
-      LogicalType fieldType = rowType.getTypeAt(fieldNames.indexOf(keyField));
-      for (ExpressionEvaluators.Evaluator evaluator: evaluators) {
-        // if there exists multiple ref fields in an evaluator, ignore this 
evaluator, e.g., key = 'key1' or age = 20
-        List<Object> literals = collectLiterals(evaluator, keyField);
-        literals.forEach(val -> recordKeys.add(isComplexRecordKey
-            ? keyField + KeyGenerator.DEFAULT_COLUMN_VALUE_SEPARATOR + 
normalizeLiteral(val, keyField, fieldType, consistentLogicalTimestampEnabled)
-            : normalizeLiteral(val, keyField, fieldType, 
consistentLogicalTimestampEnabled)));
-      }
-      if (recordKeys.isEmpty()) {
-        LOG.info("No literals found for the record key: {}, therefore 
filtering can not be performed", keyField);
-        return Collections.emptyList();
-      } else if (!isComplexRecordKey || hoodieKeys.isEmpty()) {
-        hoodieKeys = recordKeys;
-      } else {
-        // Combine literals for this configured record key with literals for 
the other configured record keys
-        // If there are two literals for rk1, rk2, rk3 each. A total of 8 
combinations will be generated
-        List<String> tmpHoodieKeys = new ArrayList<>();
-        for (String compositeKey: hoodieKeys) {
-          for (String recordKey: recordKeys) {
-            tmpHoodieKeys.add(compositeKey + 
KeyGenerator.DEFAULT_RECORD_KEY_PARTS_SEPARATOR + recordKey);
-          }
-        }
-        hoodieKeys = tmpHoodieKeys;
-      }
-    }
-    return hoodieKeys;
+  private Map<String, Map<Integer, List<String>>> groupKeysByPartitionAndShard(
+      Set<String> partitions,
+      Map<String, List<FileSlice>> fileGroupsByDataPartition) {
+    return partitions.stream().collect(Collectors.toMap(
+        partition -> partition,
+        partition -> {
+          List<FileSlice> fileGroups = 
fileGroupsByDataPartition.get(partition);
+          ValidationUtils.checkState(fileGroups != null && 
!fileGroups.isEmpty(),
+              "No record index file groups found for data partition: " + 
partition);
+          return hoodieKeysFromFilter.stream().collect(Collectors.groupingBy(
+              key -> HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(key, 
fileGroups.size()),
+              Collectors.toCollection(ArrayList::new)));
+        }));
   }
 
   /**
-   * Collect literal values for record key fields from the predicate.
+   * Looks up one pre-bucketed {@code (partition, shard)} group and converts 
matched RLI locations to file group IDs.
    */
-  private static List<Object> collectLiterals(ExpressionEvaluators.Evaluator 
evaluator, String refName) {
-    if (evaluator instanceof ExpressionEvaluators.LeafEvaluator
-        && !((ExpressionEvaluators.LeafEvaluator) 
evaluator).getName().equalsIgnoreCase(refName)) {
-      return Collections.emptyList();
-    }
-    if (evaluator instanceof ExpressionEvaluators.EqualTo) {
-      Object valueLiteral = ((ExpressionEvaluators.EqualTo) 
evaluator).getVal();
-      return valueLiteral == null ? Collections.emptyList() : 
Collections.singletonList(valueLiteral);
-    } else if (evaluator instanceof ExpressionEvaluators.In) {
-      Object[] valueLiterals = ((ExpressionEvaluators.In) evaluator).getVals();
-      if (valueLiterals.length < 1 || 
Arrays.stream(valueLiterals).anyMatch(Objects::isNull)) {
-        return Collections.emptyList();
-      }
-      return Arrays.stream(valueLiterals).collect(Collectors.toList());
-    } else if (evaluator instanceof ExpressionEvaluators.Or) {
-      List<List<Object>> literalsList = 
Arrays.stream(((ExpressionEvaluators.Or) evaluator).getEvaluators())
-          .map(eval -> collectLiterals(eval, 
refName)).collect(Collectors.toList());
-      // if any child expr do not contain predicate on the key, just return 
empty list
-      if (literalsList.stream().anyMatch(List::isEmpty)) {
-        return Collections.emptyList();
-      }
-      return 
literalsList.stream().flatMap(List::stream).distinct().collect(Collectors.toList());
-    } else {
-      return Collections.emptyList();
-    }
+  private static Set<HoodieFileGroupId> lookupFileGroupIds(
+      HoodieTableMetadata metadataTable,
+      PartitionShardKeys lookupGroup) {
+    HoodiePairData<String, HoodieRecordGlobalLocation> recordIndexData =
+        metadataTable.readRecordIndexLocationsWithKeys(
+            HoodieListData.eager(lookupGroup.keysInSingleShard), 
Option.of(lookupGroup.partition));
+    return getFileGroupIds(recordIndexData);
   }
 
   /**
-   * Normalize literal values before used to get record index locations.
+   * Caps local lookup parallelism by available processors and the number of 
non-empty lookup groups.
    */
-  private static String normalizeLiteral(Object value, String keyField, 
LogicalType fieldType, boolean consistentLogicalTimestampEnabled) {
-    switch (fieldType.getTypeRoot()) {
-      case DECIMAL:
-        // the scale of decimal data in predicate may not be aligned with that 
in record index, padding 0 if necessary,
-        // e.g., 1.11 with target scale 5, return 1.11000
-        BigDecimal decimal = (BigDecimal) value;
-        int targetScale = ((DecimalType) fieldType).getScale();
-        value = decimal.scale() >= targetScale ? value : 
decimal.setScale(targetScale);
-        break;
-      case TIMESTAMP_WITHOUT_TIME_ZONE:
-        // the original value is extracted from literal by 
ExpressionUtils#getValueFromLiteral, which is epoch millis
-        // convert it back to TimestampData before reusing key generating 
logic in RowDataKeyGen.
-        value = TimestampData.fromEpochMillis((Long) value);
-        break;
-      default:
-        break;
-    }
-    // to align with the hoodie key generating logic in writer side.
-    return RowDataKeyGen.getRecordKey(value, keyField, 
consistentLogicalTimestampEnabled);
+  private static int getLookupParallelism(int lookupGroupCount) {
+    return Math.max(1, Math.min(lookupGroupCount, 
Runtime.getRuntime().availableProcessors()));
   }
 
-  @Override
-  public void close() {
-    if (this.metadataTable == null) {
-      return;
-    }
-    try {
-      this.metadataTable.close();
-    } catch (Exception e) {
-      throw new HoodieException("Exception happened during close metadata 
table.", e);
+  /**
+   * Keys that hash to one RLI shard within one data-table partition.
+   */
+  private static class PartitionShardKeys {
+    private final String partition;
+    private final List<String> keysInSingleShard;
+
+    private PartitionShardKeys(String partition, List<String> 
keysInSingleShard) {
+      this.partition = partition;
+      this.keysInSingleShard = keysInSingleShard;
     }
   }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java
index ae7aaa022df3..d421f6e72098 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java
@@ -260,6 +260,73 @@ public class TestFileIndex {
     assertThat(fileSlices.size(), is(expectedCnt));
   }
 
+  @ParameterizedTest
+  @MethodSource("filtersAndResults")
+  void testFileListingWithPartitionedRecordLevelIndex(String recordFields, 
ColumnStatsProbe probe, int maxKeyCnt, int expectedCnt) throws Exception {
+    DataType dataType = TestConfigurations.ROW_DATA_TYPE_WITH_ATOMIC_TYPES;
+    Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath(), dataType);
+    conf.set(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_COPY_ON_WRITE);
+    conf.set(FlinkOptions.METADATA_ENABLED, true);
+    conf.set(FlinkOptions.READ_DATA_SKIPPING_ENABLED, true);
+    conf.set(FlinkOptions.RECORD_KEY_FIELD, recordFields);
+    conf.set(FlinkOptions.READ_DATA_SKIPPING_RLI_KEYS_MAX_NUM, maxKeyCnt);
+    // Enable partitioned record level index specifically for this test
+    conf.setString(HoodieMetadataConfig.RECORD_LEVEL_INDEX_ENABLE_PROP.key(), 
"true");
+
+    // Write test data
+    TestData.writeData(TestData.DATA_SET_WITH_ATOMIC_TYPES, conf);
+
+    HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
+    // Create a filter on the record key 'uuid' with EQUALS operator to 
trigger record-level index
+    FileIndex fileIndex =
+        FileIndex.builder()
+            .path(new StoragePath(tempFile.getAbsolutePath()))
+            .conf(conf)
+            .rowType((RowType) dataType.getLogicalType())
+            .metaClient(metaClient)
+            .columnStatsProbe(probe)
+            .build();
+
+    // Get filtered file slices - this should use partitioned record-level 
index data skipping
+    List<FileSlice> fileSlices = getFilteredFileSlices(metaClient, fileIndex);
+    assertThat(fileSlices.size(), is(expectedCnt));
+  }
+
+  @Test
+  void testFileListingWithPartitionedRecordLevelIndexExceedingMaxPartitions() 
throws Exception {
+    Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+    conf.set(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_COPY_ON_WRITE);
+    conf.set(FlinkOptions.READ_DATA_SKIPPING_ENABLED, true);
+    conf.set(FlinkOptions.RECORD_KEY_FIELD, "uuid");
+    conf.setString(HoodieMetadataConfig.RECORD_LEVEL_INDEX_ENABLE_PROP.key(), 
"true");
+
+    TestData.writeData(TestData.DATA_SET_INSERT, conf);
+
+    HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
+    // record predicate `f_str` = 'str1' would normally prune to a single file 
slice
+    CallExpression equalExpr = CallExpression.permanent(
+        BuiltInFunctionDefinitions.EQUALS,
+        Arrays.asList(
+            new FieldReferenceExpression("uuid", DataTypes.STRING(), 0, 0),
+            new ValueLiteralExpression("id1", DataTypes.STRING().notNull())
+        ),
+        DataTypes.BOOLEAN());
+    ColumnStatsProbe probe = 
ColumnStatsProbe.newInstance(Collections.singletonList(equalExpr));
+    FileIndex fileIndex =
+        FileIndex.builder()
+            .path(new StoragePath(tempFile.getAbsolutePath()))
+            .conf(conf)
+            .rowType(TestConfigurations.ROW_TYPE)
+            .metaClient(metaClient)
+            .columnStatsProbe(probe)
+            .build();
+
+    // the number of candidate partitions (4) exceeds the configured threshold 
(3),
+    // so partitioned record level index pruning is skipped and all file 
slices are returned
+    List<FileSlice> fileSlices = getFilteredFileSlices(metaClient, fileIndex);
+    assertThat(fileSlices.size(), is(4));
+  }
+
   private static Stream<Arguments> filtersAndResults() {
     CallExpression equalTinyInt = CallExpression.permanent(
         BuiltInFunctionDefinitions.EQUALS,
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/stats/TestRecordLevelIndex.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/stats/TestRecordLevelIndex.java
index a23a5247139c..1aae2af4ed7c 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/stats/TestRecordLevelIndex.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/stats/TestRecordLevelIndex.java
@@ -18,10 +18,19 @@
 
 package org.apache.hudi.source.stats;
 
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.data.HoodieListPairData;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.keygen.KeyGenerator;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.MetadataPartitionType;
 import org.apache.hudi.sink.bulk.RowDataKeyGen;
 import org.apache.hudi.source.ExpressionEvaluators;
 import org.apache.hudi.utils.TestConfigurations;
@@ -49,12 +58,28 @@ import java.time.LocalDateTime;
 import java.time.ZoneId;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 import static 
org.apache.hudi.utils.TestConfigurations.ROW_DATA_TYPE_HOODIE_KEY_SPECIAL_DATA_TYPE;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.lenient;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 /**
@@ -75,6 +100,182 @@ public class TestRecordLevelIndex {
       .notNull();
   private static final RowType ROW_TYPE_MULTI_KEYS = (RowType) 
ROW_DATA_TYPE_MULTI_KEYS.getLogicalType();
 
+  @Test
+  void testPartitionedRliGroupsKeysByShardWithinPartition() {
+    HoodieTableMetadata metadataTable = mock(HoodieTableMetadata.class);
+    List<String> recordKeys = keysAcrossShards(2);
+    RecordLevelIndex recordLevelIndex = createRecordLevelIndex(metadataTable, 
recordKeys);
+    List<FileSlice> fileSlices = Arrays.asList(
+        fileSlice("par1", "file1"),
+        fileSlice("par1", "file2"));
+    Map<String, List<FileSlice>> partitionedRliFileGroups = new HashMap<>();
+    partitionedRliFileGroups.put("par1", Arrays.asList(
+        fileSlice("par1", "rli-file1"),
+        fileSlice("par1", "rli-file2")));
+    
when(metadataTable.getBucketizedFileGroupsForPartitionedRLI(MetadataPartitionType.RECORD_INDEX))
+        .thenReturn(partitionedRliFileGroups);
+    doReturn(HoodieListPairData.eager(Collections.singletonList(
+        Pair.of(recordKeys.get(0), new HoodieRecordGlobalLocation("par1", 
"001", "file1")))))
+        .when(metadataTable).readRecordIndexLocationsWithKeys(any(), 
eq(Option.of("par1")));
+
+    List<FileSlice> result = 
recordLevelIndex.computeCandidateFileSlices(fileSlices);
+
+    assertEquals(Collections.singletonList(fileSlices.get(0)), result);
+    Set<Set<String>> expectedKeyGroups = recordKeys.stream()
+        .collect(Collectors.groupingBy(key -> 
HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(key, 2)))
+        .values().stream()
+        .map(HashSet::new)
+        .collect(Collectors.toSet());
+    verify(metadataTable, 
times(expectedKeyGroups.size())).readRecordIndexLocationsWithKeys(
+        argThat(keys -> expectedKeyGroups.contains(new 
HashSet<>(((HoodieData<String>) keys).collectAsList()))),
+        eq(Option.of("par1")));
+    verify(metadataTable, never()).readRecordIndexLocationsWithKeys(any());
+  }
+
+  @Test
+  void testPartitionedRliPrunesAtPartitionThreshold() {
+    HoodieTableMetadata metadataTable = mock(HoodieTableMetadata.class);
+    RecordLevelIndex recordLevelIndex = createRecordLevelIndex(metadataTable);
+    List<FileSlice> fileSlices = Arrays.asList(
+        fileSlice("par1", "file1"),
+        fileSlice("par2", "file2"),
+        fileSlice("par3", "file3"));
+    
when(metadataTable.getBucketizedFileGroupsForPartitionedRLI(MetadataPartitionType.RECORD_INDEX))
+        .thenReturn(partitionedRliFileGroups("par1", "par2", "par3"));
+    doAnswer(invocation -> {
+      String partition = invocation.<Option<String>>getArgument(1).get();
+      return HoodieListPairData.eager(Collections.singletonList(
+          Pair.of("id1", new HoodieRecordGlobalLocation(
+              partition, "001", "file" + partition.substring(3)))));
+    })
+        .when(metadataTable).readRecordIndexLocationsWithKeys(any(), any());
+
+    List<FileSlice> result = 
recordLevelIndex.computeCandidateFileSlices(fileSlices);
+
+    assertEquals(fileSlices, result);
+    verify(metadataTable).readRecordIndexLocationsWithKeys(any(), 
eq(Option.of("par1")));
+    verify(metadataTable).readRecordIndexLocationsWithKeys(any(), 
eq(Option.of("par2")));
+    verify(metadataTable).readRecordIndexLocationsWithKeys(any(), 
eq(Option.of("par3")));
+    verify(metadataTable, never()).readRecordIndexLocationsWithKeys(any());
+  }
+
+  @Test
+  void testPartitionedRliSkipsPruningAbovePartitionThreshold() {
+    HoodieTableMetadata metadataTable = mock(HoodieTableMetadata.class);
+    RecordLevelIndex recordLevelIndex = createRecordLevelIndex(metadataTable);
+    List<FileSlice> fileSlices = Arrays.asList(
+        fileSlice("par1", "file1"),
+        fileSlice("par2", "file2"),
+        fileSlice("par3", "file3"),
+        fileSlice("par4", "file4"));
+
+    List<FileSlice> result = 
recordLevelIndex.computeCandidateFileSlices(fileSlices);
+
+    assertSame(fileSlices, result);
+    verify(metadataTable, 
never()).getBucketizedFileGroupsForPartitionedRLI(any());
+    verify(metadataTable, never()).readRecordIndexLocationsWithKeys(any(), 
any());
+    verify(metadataTable, never()).readRecordIndexLocationsWithKeys(any());
+  }
+
+  @Test
+  void testPartitionedRliFallsBackWhenPartitionLookupFails() {
+    HoodieTableMetadata metadataTable = mock(HoodieTableMetadata.class);
+    RecordLevelIndex recordLevelIndex = createRecordLevelIndex(metadataTable);
+    List<FileSlice> fileSlices = Arrays.asList(
+        fileSlice("par1", "file1"),
+        fileSlice("par2", "file2"));
+    
when(metadataTable.getBucketizedFileGroupsForPartitionedRLI(MetadataPartitionType.RECORD_INDEX))
+        .thenReturn(partitionedRliFileGroups("par1", "par2"));
+    doReturn(HoodieListPairData.eager(Collections.singletonList(
+        Pair.of("id1", new HoodieRecordGlobalLocation("par1", "001", 
"file1")))))
+        .when(metadataTable).readRecordIndexLocationsWithKeys(any(), 
eq(Option.of("par1")));
+    doThrow(new RuntimeException("lookup failure"))
+        .when(metadataTable).readRecordIndexLocationsWithKeys(any(), 
eq(Option.of("par2")));
+
+    List<FileSlice> result = 
recordLevelIndex.computeCandidateFileSlices(fileSlices);
+
+    assertSame(fileSlices, result);
+    verify(metadataTable).readRecordIndexLocationsWithKeys(any(), 
eq(Option.of("par2")));
+  }
+
+  @Test
+  void testPartitionedRliSkipsPruningForNoCandidateFiles() {
+    HoodieTableMetadata metadataTable = mock(HoodieTableMetadata.class);
+    RecordLevelIndex recordLevelIndex = createRecordLevelIndex(metadataTable);
+    List<FileSlice> fileSlices = Collections.emptyList();
+
+    List<FileSlice> result = 
recordLevelIndex.computeCandidateFileSlices(fileSlices);
+
+    assertSame(fileSlices, result);
+    verify(metadataTable, 
never()).getBucketizedFileGroupsForPartitionedRLI(any());
+    verify(metadataTable, never()).readRecordIndexLocationsWithKeys(any(), 
any());
+    verify(metadataTable, never()).readRecordIndexLocationsWithKeys(any());
+  }
+
+  @Test
+  void testGlobalRliMatchesPartitionAndFileId() {
+    HoodieTableMetadata metadataTable = mock(HoodieTableMetadata.class);
+    GlobalRecordLevelIndex recordLevelIndex = 
createGlobalRecordLevelIndex(metadataTable);
+    List<FileSlice> fileSlices = Arrays.asList(
+        fileSlice("par1", "file1"),
+        fileSlice("par2", "file1"));
+    when(metadataTable.readRecordIndexLocationsWithKeys(any()))
+        .thenReturn(HoodieListPairData.eager(Collections.singletonList(
+            Pair.of("id1", new HoodieRecordGlobalLocation("par1", "001", 
"file1")))));
+
+    List<FileSlice> result = 
recordLevelIndex.computeCandidateFileSlices(fileSlices);
+
+    assertEquals(Collections.singletonList(fileSlices.get(0)), result);
+    verify(metadataTable).readRecordIndexLocationsWithKeys(any());
+    verify(metadataTable, never()).readRecordIndexLocationsWithKeys(any(), 
any());
+  }
+
+  private RecordLevelIndex createRecordLevelIndex(HoodieTableMetadata 
metadataTable) {
+    return createRecordLevelIndex(metadataTable, 
Collections.singletonList("id1"));
+  }
+
+  private RecordLevelIndex createRecordLevelIndex(HoodieTableMetadata 
metadataTable, List<String> recordKeys) {
+    mockAvailableRecordIndex();
+    RecordLevelIndex recordLevelIndex = spy(
+        new RecordLevelIndex("", new Configuration(), metaClient, recordKeys));
+    
lenient().doReturn(metadataTable).when(recordLevelIndex).getMetadataTable();
+    return recordLevelIndex;
+  }
+
+  private GlobalRecordLevelIndex 
createGlobalRecordLevelIndex(HoodieTableMetadata metadataTable) {
+    mockAvailableRecordIndex();
+    GlobalRecordLevelIndex recordLevelIndex = spy(
+        new GlobalRecordLevelIndex("", new Configuration(), metaClient, 
Collections.singletonList("id1")));
+    
lenient().doReturn(metadataTable).when(recordLevelIndex).getMetadataTable();
+    return recordLevelIndex;
+  }
+
+  private void mockAvailableRecordIndex() {
+    when(metaClient.getTableConfig()).thenReturn(tableConfig);
+    when(tableConfig.isMetadataTableAvailable()).thenReturn(true);
+    
when(tableConfig.getMetadataPartitions()).thenReturn(Collections.singleton(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX));
+  }
+
+  private static FileSlice fileSlice(String partition, String fileId) {
+    return new FileSlice(new HoodieFileGroupId(partition, fileId), "001");
+  }
+
+  private static Map<String, List<FileSlice>> 
partitionedRliFileGroups(String... partitions) {
+    Map<String, List<FileSlice>> partitionedRliFileGroups = new HashMap<>();
+    Arrays.stream(partitions).forEach(partition ->
+        partitionedRliFileGroups.put(partition, 
Collections.singletonList(fileSlice(partition, "rli-" + partition))));
+    return partitionedRliFileGroups;
+  }
+
+  private static List<String> keysAcrossShards(int fileGroupCount) {
+    Map<Integer, String> keysByShard = new HashMap<>();
+    for (int i = 0; keysByShard.size() < fileGroupCount; i++) {
+      String key = "id" + i;
+      
keysByShard.putIfAbsent(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(key,
 fileGroupCount), key);
+    }
+    return keysByShard.values().stream().collect(Collectors.toList());
+  }
+
   private List<ExpressionEvaluators.Evaluator> 
createColumnStatsProbe(BuiltInFunctionDefinition func, String refName, 
List<String> vals) {
     List<ResolvedExpression> args = vals.stream().map(
         val -> new ValueLiteralExpression(val, 
DataTypes.STRING().notNull())).collect(Collectors.toList());
@@ -125,7 +326,7 @@ public class TestRecordLevelIndex {
     List<ExpressionEvaluators.Evaluator> evaluators = createColumnStatsProbe(
         BuiltInFunctionDefinitions.EQUALS, "uuid", 
Collections.singletonList("id1"));
     String[] recordKeyFields = {"uuid"};
-    List<String> result = RecordLevelIndex.computeHoodieKeyFromFilters(
+    List<String> result = BaseRecordLevelIndex.computeHoodieKeyFromFilters(
         conf, metaClient, evaluators, recordKeyFields, 
TestConfigurations.ROW_TYPE, false);
     assertEquals(Collections.singletonList("id1"), result, "Should return the 
simple record key value");
   }
@@ -142,7 +343,7 @@ public class TestRecordLevelIndex {
     // Test with IN operator
     List<ExpressionEvaluators.Evaluator> evaluators = 
createColumnStatsProbe(BuiltInFunctionDefinitions.IN, "uuid", 
Arrays.asList("id1", "id2", "id3"));
     String[] recordKeyFields = {"uuid"};
-    List<String> result = RecordLevelIndex.computeHoodieKeyFromFilters(
+    List<String> result = BaseRecordLevelIndex.computeHoodieKeyFromFilters(
         conf, metaClient, evaluators, recordKeyFields, 
TestConfigurations.ROW_TYPE, false);
     assertEquals(Arrays.asList("id1", "id2", "id3"), result, "Should return 
all the IN operator values");
   }
@@ -159,7 +360,7 @@ public class TestRecordLevelIndex {
     // Test with OR operator (which should be converted to IN)
     List<ExpressionEvaluators.Evaluator> evaluators = 
createOrColumnStatsProbe("uuid", Arrays.asList("id1", "id2", "id3"));
     String[] recordKeyFields = {"uuid"};
-    List<String> result = RecordLevelIndex.computeHoodieKeyFromFilters(
+    List<String> result = BaseRecordLevelIndex.computeHoodieKeyFromFilters(
         conf, metaClient, evaluators, recordKeyFields, 
TestConfigurations.ROW_TYPE, false);
     // Note: OR with two values "id1" and "id2" should result in the literals 
from both evaluators
     assertEquals(Arrays.asList("id1", "id2", "id3"), result, "Should return 
values from OR operator");
@@ -188,7 +389,7 @@ public class TestRecordLevelIndex {
             DataTypes.BOOLEAN())
     );
     String[] recordKeyFields = {"key1", "key2"};
-    List<String> result = RecordLevelIndex.computeHoodieKeyFromFilters(
+    List<String> result = BaseRecordLevelIndex.computeHoodieKeyFromFilters(
         conf, metaClient, ExpressionEvaluators.fromExpression(expressions), 
recordKeyFields, ROW_TYPE_MULTI_KEYS, false);
     // For complex keys, the format should be key1:val1,key2:val2
     assertEquals(Arrays.asList("key1:val1" + 
KeyGenerator.DEFAULT_RECORD_KEY_PARTS_SEPARATOR + "key2:val2"), result,
@@ -223,7 +424,7 @@ public class TestRecordLevelIndex {
     );
     String[] recordKeyFields = {"f_timestamp", "f_decimal"};
     RowType rowType = (RowType) 
ROW_DATA_TYPE_HOODIE_KEY_SPECIAL_DATA_TYPE.getLogicalType();
-    List<String> result = RecordLevelIndex.computeHoodieKeyFromFilters(
+    List<String> result = BaseRecordLevelIndex.computeHoodieKeyFromFilters(
         conf, metaClient, ExpressionEvaluators.fromExpression(expressions), 
recordKeyFields, rowType, consistentLogicalTimestampEnabled);
     String expectedTimestampVal = 
RowDataKeyGen.getRecordKey(TimestampData.fromEpochMillis(1), "f_timestamp", 
consistentLogicalTimestampEnabled);
     assertEquals(Arrays.asList("f_timestamp:" + expectedTimestampVal + 
KeyGenerator.DEFAULT_RECORD_KEY_PARTS_SEPARATOR + "f_decimal:1.10"), result,
@@ -254,7 +455,7 @@ public class TestRecordLevelIndex {
             DataTypes.BOOLEAN())
     );
     String[] recordKeyFields = {"uuid"};
-    List<String> result = RecordLevelIndex.computeHoodieKeyFromFilters(
+    List<String> result = BaseRecordLevelIndex.computeHoodieKeyFromFilters(
         conf, metaClient, ExpressionEvaluators.fromExpression(expressions), 
recordKeyFields, TestConfigurations.ROW_TYPE, false);
     // This should return both values
     assertEquals(Arrays.asList("id1", "id2"), result, "Should return multiple 
values for same field");
@@ -273,7 +474,7 @@ public class TestRecordLevelIndex {
     List<ExpressionEvaluators.Evaluator> evaluators = createColumnStatsProbe(
         BuiltInFunctionDefinitions.EQUALS, "nonKeyField", 
Collections.singletonList("val1"));
     String[] recordKeyFields = {"uuid"};
-    List<String> result = RecordLevelIndex.computeHoodieKeyFromFilters(
+    List<String> result = BaseRecordLevelIndex.computeHoodieKeyFromFilters(
         conf, metaClient, evaluators, recordKeyFields, 
TestConfigurations.ROW_TYPE, false);
     assertEquals(Collections.emptyList(), result, "Should return empty list 
when filtering on non-record key field");
 
@@ -293,7 +494,7 @@ public class TestRecordLevelIndex {
         DataTypes.BOOLEAN());
 
     evaluators = 
ExpressionEvaluators.fromExpression(Collections.singletonList(orExpression));
-    result = RecordLevelIndex.computeHoodieKeyFromFilters(
+    result = BaseRecordLevelIndex.computeHoodieKeyFromFilters(
         conf, metaClient, evaluators, recordKeyFields, 
TestConfigurations.ROW_TYPE, false);
     assertEquals(Collections.emptyList(), result, "Should return empty list 
when filtering on or predicate including multiple fields");
   }
@@ -324,7 +525,7 @@ public class TestRecordLevelIndex {
             DataTypes.BOOLEAN())
     );
     String[] recordKeyFields = {"key1", "key2"};
-    List<String> result = RecordLevelIndex.computeHoodieKeyFromFilters(
+    List<String> result = BaseRecordLevelIndex.computeHoodieKeyFromFilters(
         conf, metaClient, ExpressionEvaluators.fromExpression(expressions), 
recordKeyFields, ROW_TYPE_MULTI_KEYS, false);
     // Should have 4 combinations: (val1,val3), (val1,val4), (val2,val3), 
(val2,val4)
     List<String> expected = Arrays.asList(
@@ -335,4 +536,4 @@ public class TestRecordLevelIndex {
     );
     assertEquals(expected, result, "Should return all combinations of complex 
record keys");
   }
-}
\ No newline at end of file
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 749da56c8a16..dbe596007f7a 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -614,6 +614,54 @@ public class ITTestHoodieDataSource {
         + "+I[id8, Han, 56, 1970-01-01T00:00:08, par4]]");
   }
 
+  @ParameterizedTest
+  @MethodSource("tableTypeAndBooleanTrueFalseParams")
+  void testDataSkippingWithPartitionedRecordLevelIndex(
+      HoodieTableType tableType, boolean useSourceV2) throws Exception {
+    String writerTableDDL = sql("t1")
+        .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .options(getDefaultKeys())
+        .option(FlinkOptions.INDEX_TYPE, 
HoodieIndex.IndexType.RECORD_LEVEL_INDEX.name())
+        .option(FlinkOptions.TABLE_TYPE, tableType.name())
+        .end();
+    streamTableEnv.executeSql(writerTableDDL);
+    execInsertSql(streamTableEnv, TestSQL.INSERT_T1);
+
+    String readerTableDDL = sql("t1_read")
+        .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .options(getDefaultKeys())
+        .option(FlinkOptions.READ_DATA_SKIPPING_ENABLED, true)
+        .option(FlinkOptions.TABLE_TYPE, tableType.name())
+        .option(FlinkOptions.READ_SOURCE_V2_ENABLED, useSourceV2)
+        .end();
+    batchTableEnv.executeSql(readerTableDDL);
+
+    List<Row> result = CollectionUtil.iterableToList(
+        () -> batchTableEnv.sqlQuery(
+            "select * from t1_read where `partition` = 'par1' and uuid = 
'id1'").execute().collect());
+    assertRowsEquals(result, "[+I[id1, Danny, 23, 1970-01-01T00:00:01, 
par1]]");
+
+    List<Row> multiPartitionResult = CollectionUtil.iterableToList(
+        () -> batchTableEnv.sqlQuery(
+            "select * from t1_read where `partition` in ('par1', 'par4') and 
uuid in ('id1', 'id7')").execute().collect());
+    assertRowsEquals(multiPartitionResult, "["
+        + "+I[id1, Danny, 23, 1970-01-01T00:00:01, par1], "
+        + "+I[id7, Bob, 44, 1970-01-01T00:00:07, par4]]");
+
+    // insert a record with id1 into the par4.
+    String insert = "insert into t1 values ('id1','Jack',23,TIMESTAMP 
'1970-01-01 00:00:01','par4')";
+    execInsertSql(streamTableEnv, insert);
+    // test scenario query predicate also includes a partition name which 
doesn't exist.
+    multiPartitionResult = CollectionUtil.iterableToList(
+        () -> batchTableEnv.sqlQuery(
+            "select * from t1_read where `partition` in ('par1', 'par4', 
'par5') and uuid in ('id1', 'id7')").execute().collect());
+    assertRowsEqualsUnordered(multiPartitionResult,
+        Arrays.asList(
+            "+I[id1, Danny, 23, 1970-01-01T00:00:01, par1]",
+            "+I[id1, Jack, 23, 1970-01-01T00:00:01, par4]",
+            "+I[id7, Bob, 44, 1970-01-01T00:00:07, par4]"));
+  }
+
   @ParameterizedTest
   @MethodSource("tableTypeAndSourceV2AndBooleanTrueFalseParams")
   void testReadWithPartitionStatsPruning(HoodieTableType tableType, boolean 
useSourceV2, boolean hiveStylePartitioning) throws Exception {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index 2c8f5c29640f..e227e8bd71d4 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -674,6 +674,17 @@ public class TestData {
     return obj == null ? "null" : obj.toString();
   }
 
+  /**
+   * Assert that expected row strings and actual collection of rows are equal 
regardless of the order.
+   *
+   * @param rows     Actual result rows
+   * @param expected Expected row strings
+   */
+  public static void assertRowsEqualsUnordered(List<Row> rows, List<String> 
expected) {
+    List<String> actualRows = 
rows.stream().map(Row::toString).collect(Collectors.toList());
+    assertEquals(new HashSet<>(expected), new HashSet<>(actualRows));
+  }
+
   /**
    * Sort the {@code rows} using field at index 0 and asserts
    * it equals with the expected string {@code expected}.

Reply via email to