This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 4ffad38863f0 feat: Support data skipping based on record index for 
flink reader (#17490)
4ffad38863f0 is described below

commit 4ffad38863f03e753e9f9a6fc5bcfbc79a125ae5
Author: Shuo Cheng <[email protected]>
AuthorDate: Wed Dec 17 10:39:07 2025 +0800

    feat: Support data skipping based on record index for flink reader (#17490)
---
 .../client/common/HoodieFlinkEngineContext.java    |  11 +-
 .../apache/hudi/configuration/FlinkOptions.java    |  12 +
 .../org/apache/hudi/sink/bulk/RowDataKeyGen.java   |   2 +-
 .../apache/hudi/source/ExpressionEvaluators.java   |  16 +
 .../java/org/apache/hudi/source/FileIndex.java     |  34 ++-
 .../apache/hudi/source/prune/ColumnStatsProbe.java |   4 +
 .../apache/hudi/source/prune/PartitionPruners.java |  16 +-
 .../apache/hudi/source/stats/ColumnStatsIndex.java |   9 +-
 .../apache/hudi/source/stats/FileStatsIndex.java   |  30 +-
 ...lumnStatsIndex.java => FlinkMetadataIndex.java} |  30 +-
 .../hudi/source/stats/PartitionStatsIndex.java     |   6 +
 .../apache/hudi/source/stats/RecordLevelIndex.java | 277 +++++++++++++++++
 .../org/apache/hudi/table/HoodieTableSource.java   |  99 +++---
 .../java/org/apache/hudi/source/TestFileIndex.java | 216 +++++++++++++
 .../hudi/source/TestIncrementalInputSplits.java    |   3 +-
 .../hudi/source/stats/TestRecordLevelIndex.java    | 338 +++++++++++++++++++++
 .../apache/hudi/table/ITTestHoodieDataSource.java  |  30 ++
 .../apache/hudi/table/TestHoodieTableSource.java   |   7 +-
 .../org/apache/hudi/utils/TestConfigurations.java  |  22 ++
 .../test/java/org/apache/hudi/utils/TestData.java  |   8 +
 20 files changed, 1068 insertions(+), 102 deletions(-)

diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
index edbdc1ef0703..e993d3ccfa52 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
@@ -37,9 +37,11 @@ import org.apache.hudi.common.function.SerializableFunction;
 import org.apache.hudi.common.function.SerializablePairFlatMapFunction;
 import org.apache.hudi.common.function.SerializablePairFunction;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.Functions;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.collection.ClosableSortingIterator;
 import org.apache.hudi.common.util.collection.ImmutablePair;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
@@ -236,7 +238,14 @@ public class HoodieFlinkEngineContext extends 
HoodieEngineContext {
                                                                                
             SerializableFunction<Iterator<V>, Iterator<R>> processFunc,
                                                                                
             List<K> keySpace,
                                                                                
             boolean preservesPartitioning) {
-    throw new UnsupportedOperationException("processKeyGroups() is not 
supported in FlinkEngineContext");
+    // Group values by key and apply the function to each group in parallel
+    List<Iterable<V>> groupedValues = 
data.groupByKey().values().collectAsList();
+    // Process each group in parallel using parallel stream
+    List<R> results = executeParallelStream(
+        groupedValues.parallelStream(),
+        stream -> stream.map(values -> 
throwingMapWrapper(processFunc).apply(new 
ClosableSortingIterator<>(values.iterator()))),
+        
groupedValues.size()).flatMap(CollectionUtils::toStream).collect(Collectors.toList());
+    return HoodieListData.eager(results);
   }
 
   @Override
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 7ba63278497e..960e45f16f7e 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -417,6 +417,18 @@ public class FlinkOptions extends HoodieConfig {
       .withDescription("Enables data-skipping allowing queries to leverage 
indexes to reduce the search space by "
           + "skipping over files");
 
+  @AdvancedConfig
+  public static final ConfigOption<Integer> 
READ_DATA_SKIPPING_RLI_KEYS_MAX_NUM = ConfigOptions
+      .key("read.data.skipping.rli.keys.max.num")
+      .intType()
+      .defaultValue(8)
+      .withDescription("Record Level index statistics will be read from 
metadata table (MDT) for data skipping optimization,\n"
+          + "and currently the index statistics are collected by a single 
process. This config is used to constrain the maximum \n"
+          + " number of hoodie keys that can be read from MDT without 
sacrificing any performance. If the number of hoodie keys from query\n"
+          + "predicate is greater than the maximum value, the query will 
fallback to skip the record level index filtering.\n"
+          + "E.g., given query: SELECT * FROM T WHERE `uuid` IN 
(1,2,3,4,5,6,7,8,9), the number of hoodie keys is 9, and\n"
+          + "the maximum value is 8, so the source will not perform record 
level index filtering.");
+
   // ------------------------------------------------------------------------
   //  Write Options
   // ------------------------------------------------------------------------
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGen.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGen.java
index 8f0a213040a7..866e0ffae773 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGen.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGen.java
@@ -221,7 +221,7 @@ public class RowDataKeyGen implements Serializable {
   }
 
   // reference: org.apache.hudi.keygen.KeyGenUtils.getRecordKey
-  public static String getRecordKey(Object recordKeyValue, String 
recordKeyField,boolean consistentLogicalTimestampEnabled) {
+  public static String getRecordKey(Object recordKeyValue, String 
recordKeyField, boolean consistentLogicalTimestampEnabled) {
     recordKeyValue = getTimestampValue(consistentLogicalTimestampEnabled, 
recordKeyValue);
     String recordKey = StringUtils.objToString(recordKeyValue);
     if (recordKey == null || recordKey.isEmpty()) {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/ExpressionEvaluators.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/ExpressionEvaluators.java
index d6a472b2b7ad..be51a1391f4b 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/ExpressionEvaluators.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/ExpressionEvaluators.java
@@ -194,6 +194,10 @@ public class ExpressionEvaluators {
           "Can not find column " + this.name);
       return columnStats;
     }
+
+    public String getName() {
+      return this.name;
+    }
   }
 
   /**
@@ -243,6 +247,10 @@ public class ExpressionEvaluators {
       }
       return compare(maxVal, val, type) >= 0;
     }
+
+    public Object getVal() {
+      return this.val;
+    }
   }
 
   /**
@@ -424,6 +432,10 @@ public class ExpressionEvaluators {
     public void bindVals(Object... vals) {
       this.vals = vals;
     }
+
+    public Object[] getVals() {
+      return this.vals;
+    }
   }
 
   /**
@@ -521,6 +533,10 @@ public class ExpressionEvaluators {
       this.evaluators = evaluators;
       return this;
     }
+
+    public Evaluator[] getEvaluators() {
+      return this.evaluators;
+    }
   }
 
   // -------------------------------------------------------------------------
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java
index adae1d8df614..d44695b7cbd8 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java
@@ -23,12 +23,14 @@ import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.HadoopConfigurations;
 import org.apache.hudi.index.bucket.BucketIdentifier;
 import org.apache.hudi.source.prune.ColumnStatsProbe;
 import org.apache.hudi.source.prune.PartitionPruners;
 import org.apache.hudi.source.stats.FileStatsIndex;
+import org.apache.hudi.source.stats.RecordLevelIndex;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.StoragePathInfo;
 import org.apache.hudi.util.StreamerUtil;
@@ -57,7 +59,7 @@ import java.util.stream.Collectors;
  *
  * <p>It caches the partition paths to avoid redundant look up.
  */
-public class FileIndex implements Serializable {
+public class FileIndex implements Serializable, AutoCloseable {
   private static final long serialVersionUID = 1L;
 
   private static final Logger LOG = LoggerFactory.getLogger(FileIndex.class);
@@ -66,11 +68,12 @@ public class FileIndex implements Serializable {
   private final boolean tableExists;
   private final HoodieMetadataConfig metadataConfig;
   private final org.apache.hadoop.conf.Configuration hadoopConf;
-  private final PartitionPruners.PartitionPruner partitionPruner;  // for 
partition pruning
+  private final Option<PartitionPruners.PartitionPruner> partitionPruner;  // 
for partition pruning
   private final ColumnStatsProbe colStatsProbe;                    // for 
probing column stats
   private final Function<String, Integer> partitionBucketIdFunc;   // for 
bucket pruning
   private List<String> partitionPaths;                             // cache of 
partition paths
   private final FileStatsIndex fileStatsIndex;                     // for data 
skipping
+  private final Option<RecordLevelIndex> recordLevelIndex;
   private final HoodieTableMetaClient metaClient;
 
   private FileIndex(
@@ -86,9 +89,11 @@ public class FileIndex implements Serializable {
     this.tableExists = StreamerUtil.tableExists(path.toString(), hadoopConf);
     this.metadataConfig = StreamerUtil.metadataConfig(conf);
     this.colStatsProbe = 
isDataSkippingFeasible(conf.get(FlinkOptions.READ_DATA_SKIPPING_ENABLED)) ? 
colStatsProbe : null;
-    this.partitionPruner = partitionPruner;
+    this.partitionPruner = Option.ofNullable(partitionPruner);
     this.fileStatsIndex = new FileStatsIndex(path.toString(), rowType, conf, 
metaClient);
     this.partitionBucketIdFunc = partitionBucketIdFunc;
+    List<ExpressionEvaluators.Evaluator> evaluators = 
Option.ofNullable(colStatsProbe).map(ColumnStatsProbe::getEvaluators).orElse(Collections.emptyList());
+    this.recordLevelIndex = RecordLevelIndex.create(path.toString(), conf, 
metaClient, evaluators, rowType);
     this.metaClient = metaClient;
   }
 
@@ -184,8 +189,15 @@ public class FileIndex implements Serializable {
       return Collections.emptyList();
     }
 
+    // data skipping based on record index
+    if (recordLevelIndex.isPresent()) {
+      int prevSize = filteredFileSlices.size();
+      filteredFileSlices = 
recordLevelIndex.get().computeCandidateFileSlices(filteredFileSlices);
+      logPruningMsg(prevSize, filteredFileSlices.size(), "record level index 
pruning");
+    }
+
     // data skipping based on column stats
-    List<String> allFiles = 
fileSlices.stream().map(FileSlice::getAllFileNames).flatMap(List::stream).collect(Collectors.toList());
+    List<String> allFiles = 
filteredFileSlices.stream().map(FileSlice::getAllFileNames).flatMap(List::stream).collect(Collectors.toList());
     Set<String> candidateFiles = 
fileStatsIndex.computeCandidateFiles(colStatsProbe, allFiles);
     if (candidateFiles == null) {
       // no need to filter by col stats or error occurs.
@@ -237,12 +249,7 @@ public class FileIndex implements Serializable {
     }
     List<String> allPartitionPaths = this.tableExists ? 
FSUtils.getAllPartitionPaths(new HoodieFlinkEngineContext(hadoopConf), 
metaClient, metadataConfig)
         : Collections.emptyList();
-    if (this.partitionPruner == null) {
-      this.partitionPaths = allPartitionPaths;
-    } else {
-      Set<String> prunedPartitionPaths = 
this.partitionPruner.filter(allPartitionPaths);
-      this.partitionPaths = new ArrayList<>(prunedPartitionPaths);
-    }
+    this.partitionPaths = partitionPruner.map(pruner -> 
pruner.filter(allPartitionPaths).stream().collect(Collectors.toList())).orElse(allPartitionPaths);
     return this.partitionPaths;
   }
 
@@ -278,6 +285,13 @@ public class FileIndex implements Serializable {
     return (total - left) / total;
   }
 
+  @Override
+  public void close() {
+    this.fileStatsIndex.close();
+    this.recordLevelIndex.ifPresent(RecordLevelIndex::close);
+    this.partitionPruner.ifPresent(PartitionPruners.PartitionPruner::close);
+  }
+
   // -------------------------------------------------------------------------
   //  Inner class
   // -------------------------------------------------------------------------
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/prune/ColumnStatsProbe.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/prune/ColumnStatsProbe.java
index a4b481940027..11a71b9c576e 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/prune/ColumnStatsProbe.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/prune/ColumnStatsProbe.java
@@ -73,6 +73,10 @@ public class ColumnStatsProbe implements Serializable {
     return referencedCols;
   }
 
+  public List<ExpressionEvaluators.Evaluator> getEvaluators() {
+    return evaluators;
+  }
+
   @Nullable
   public static ColumnStatsProbe newInstance(List<ResolvedExpression> filters) 
{
     if (filters.isEmpty()) {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/prune/PartitionPruners.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/prune/PartitionPruners.java
index f80ba96bda88..492117515a59 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/prune/PartitionPruners.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/prune/PartitionPruners.java
@@ -50,12 +50,16 @@ import java.util.stream.Stream;
  */
 public class PartitionPruners {
 
-  public interface PartitionPruner extends Serializable {
+  public interface PartitionPruner extends Serializable, AutoCloseable {
 
     /**
      * Applies partition pruning on the given partition list, return remained 
partitions.
      */
     Set<String> filter(Collection<String> partitions);
+
+    default void close() {
+      // do nothing.
+    }
   }
 
   /**
@@ -163,6 +167,11 @@ public class PartitionPruners {
       }
       return 
partitions.stream().filter(candidatePartitions::contains).collect(Collectors.toSet());
     }
+
+    @Override
+    public void close() {
+      this.partitionStatsIndex.close();
+    }
   }
 
   /**
@@ -183,6 +192,11 @@ public class PartitionPruners {
       }
       return new HashSet<>(partitions);
     }
+
+    @Override
+    public void close() {
+      pruners.forEach(PartitionPruner::close);
+    }
   }
 
   public static Builder builder() {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsIndex.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsIndex.java
index 1d73e167e33e..d54df6dc13ff 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsIndex.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsIndex.java
@@ -20,7 +20,6 @@ package org.apache.hudi.source.stats;
 
 import org.apache.hudi.source.prune.ColumnStatsProbe;
 
-import java.io.Serializable;
 import java.util.List;
 import java.util.Set;
 
@@ -28,13 +27,7 @@ import java.util.Set;
  * Base support that leverages Metadata Table's indexes, such as Column Stats 
Index
  * and Partition Stats Index, to prune files and partitions.
  */
-public interface ColumnStatsIndex extends Serializable {
-
-  /**
-   * Returns the partition name of the index.
-   */
-  String getIndexPartitionName();
-
+public interface ColumnStatsIndex extends FlinkMetadataIndex {
   /**
    * Computes the filtered files with given candidates.
    *
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/FileStatsIndex.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/FileStatsIndex.java
index b19f4dcb155e..2af046d33300 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/FileStatsIndex.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/FileStatsIndex.java
@@ -84,7 +84,7 @@ public class FileStatsIndex implements ColumnStatsIndex {
   private final RowType rowType;
   private final String basePath;
   private final Configuration conf;
-  private HoodieTableMetaClient metaClient;
+  protected HoodieTableMetaClient metaClient;
   private HoodieTableMetadata metadataTable;
 
   public FileStatsIndex(
@@ -103,28 +103,34 @@ public class FileStatsIndex implements ColumnStatsIndex {
     return HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS;
   }
 
+  @Override
+  public boolean isIndexAvailable() {
+    return getMetaClient().getTableConfig().isMetadataTableAvailable()
+        && 
getMetaClient().getTableConfig().getMetadataPartitions().contains(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS);
+  }
+
   public HoodieTableMetadata getMetadataTable() {
     // initialize the metadata table lazily
     if (this.metadataTable == null) {
-      initMetaClient();
-      this.metadataTable = 
metaClient.getTableFormat().getMetadataFactory().create(
+      this.metadataTable = 
getMetaClient().getTableFormat().getMetadataFactory().create(
           HoodieFlinkEngineContext.DEFAULT,
-          metaClient.getStorage(),
+          getMetaClient().getStorage(),
           StreamerUtil.metadataConfig(conf),
           basePath);
     }
     return this.metadataTable;
   }
 
-  private void initMetaClient() {
+  protected HoodieTableMetaClient getMetaClient() {
     if (this.metaClient == null) {
       this.metaClient = StreamerUtil.createMetaClient(conf);
     }
+    return this.metaClient;
   }
 
   @Override
   public Set<String> computeCandidateFiles(ColumnStatsProbe probe, 
List<String> allFiles) {
-    if (probe == null) {
+    if (probe == null || !isIndexAvailable()) {
       return null;
     }
     try {
@@ -414,4 +420,16 @@ public class FileStatsIndex implements ColumnStatsIndex {
     ).collect(Collectors.toList());
     return projectNestedColStatsColumns(rows);
   }
+
+  @Override
+  public void close() {
+    if (this.metadataTable == null) {
+      return;
+    }
+    try {
+      this.metadataTable.close();
+    } catch (Exception e) {
+      throw new HoodieException("Exception happened during close metadata 
table.", e);
+    }
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsIndex.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/FlinkMetadataIndex.java
similarity index 50%
copy from 
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsIndex.java
copy to 
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/FlinkMetadataIndex.java
index 1d73e167e33e..0911d43ede10 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsIndex.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/FlinkMetadataIndex.java
@@ -18,40 +18,20 @@
 
 package org.apache.hudi.source.stats;
 
-import org.apache.hudi.source.prune.ColumnStatsProbe;
-
 import java.io.Serializable;
-import java.util.List;
-import java.util.Set;
 
 /**
- * Base support that leverages Metadata Table's indexes, such as Column Stats 
Index
- * and Partition Stats Index, to prune files and partitions.
+ * Base support that leverages Metadata Table's indexes, such as Column Stats 
Index,
+ * Partition Stats Index, Record Level Index to prune files, file slice and 
partitions.
  */
-public interface ColumnStatsIndex extends Serializable {
-
+public interface FlinkMetadataIndex extends Serializable, AutoCloseable {
   /**
    * Returns the partition name of the index.
    */
   String getIndexPartitionName();
 
   /**
-   * Computes the filtered files with given candidates.
-   *
-   * @param columnStatsProbe The utility to filter the column stats metadata.
-   * @param allFile          The file name list of the candidate files.
-   *
-   * @return The set of filtered file names
-   */
-  Set<String> computeCandidateFiles(ColumnStatsProbe columnStatsProbe, 
List<String> allFile);
-
-  /**
-   * Computes the filtered partition paths with given candidates.
-   *
-   * @param columnStatsProbe The utility to filter the column stats metadata.
-   * @param allPartitions    The <strong>relative</strong> partition path list 
of the candidate partitions.
-   *
-   * @return The set of filtered relative partition paths
+   * Returns whether the metadata partition is available for the current table;
    */
-  Set<String> computeCandidatePartitions(ColumnStatsProbe columnStatsProbe, 
List<String> allPartitions);
+  boolean isIndexAvailable();
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/PartitionStatsIndex.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/PartitionStatsIndex.java
index 1facae5671d9..4ab47c7ef28f 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/PartitionStatsIndex.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/PartitionStatsIndex.java
@@ -50,6 +50,12 @@ public class PartitionStatsIndex extends FileStatsIndex {
     return HoodieTableMetadataUtil.PARTITION_NAME_PARTITION_STATS;
   }
 
+  @Override
+  public boolean isIndexAvailable() {
+    return getMetaClient().getTableConfig().isMetadataTableAvailable()
+        && 
getMetaClient().getTableConfig().getMetadataPartitions().contains(HoodieTableMetadataUtil.PARTITION_NAME_PARTITION_STATS);
+  }
+
   @Override
   public Set<String> computeCandidateFiles(ColumnStatsProbe probe, 
List<String> allFiles) {
     throw new UnsupportedOperationException("This method is not supported by " 
+ this.getClass().getSimpleName());
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/RecordLevelIndex.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/RecordLevelIndex.java
new file mode 100644
index 000000000000..2b0d21baf047
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/RecordLevelIndex.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.stats;
+
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.common.data.HoodieListData;
+import org.apache.hudi.common.data.HoodiePairData;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.HoodieDataUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.VisibleForTesting;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.OptionsResolver;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.keygen.KeyGenUtils;
+import org.apache.hudi.keygen.KeyGenerator;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.sink.bulk.RowDataKeyGen;
+import org.apache.hudi.source.ExpressionEvaluators;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * An index support implementation that leverages Record Level Index to prune 
file slices.
+ */
+public class RecordLevelIndex implements FlinkMetadataIndex {
+  private static final long serialVersionUID = 1L;
+  private static final Logger LOG = 
LoggerFactory.getLogger(RecordLevelIndex.class);
+
+  private final String basePath;
+  private final Configuration conf;
+  private final List<String> hoodieKeysFromFilter;
+  private final HoodieTableMetaClient metaClient;
+  private HoodieTableMetadata metadataTable;
+
+  private RecordLevelIndex(
+      String basePath,
+      Configuration conf,
+      HoodieTableMetaClient metaClient,
+      List<String> hoodieKeysFromFilter) {
+    this.basePath = basePath;
+    this.conf = conf;
+    this.metaClient = metaClient;
+    this.hoodieKeysFromFilter = hoodieKeysFromFilter;
+  }
+
+  @Override
+  public String getIndexPartitionName() {
+    return HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX;
+  }
+
+  @Override
+  public boolean isIndexAvailable() {
+    return metaClient.getTableConfig().isMetadataTableAvailable()
+        && 
metaClient.getTableConfig().getMetadataPartitions().contains(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX);
+  }
+
+  public HoodieTableMetadata getMetadataTable() {
+    // initialize the metadata table lazily
+    if (this.metadataTable == null) {
+      this.metadataTable = 
metaClient.getTableFormat().getMetadataFactory().create(
+          HoodieFlinkEngineContext.DEFAULT,
+          metaClient.getStorage(),
+          StreamerUtil.metadataConfig(conf),
+          basePath);
+    }
+    return this.metadataTable;
+  }
+
+  public List<FileSlice> computeCandidateFileSlices(List<FileSlice> 
fileSlices) {
+    if (!isIndexAvailable()) {
+      return fileSlices;
+    }
+    HoodiePairData<String, HoodieRecordGlobalLocation> recordIndexData =
+        
getMetadataTable().readRecordIndexLocationsWithKeys(HoodieListData.eager(hoodieKeysFromFilter));
+    try {
+      List<Pair<String, HoodieRecordGlobalLocation>> recordIndexLocations = 
HoodieDataUtils.dedupeAndCollectAsList(recordIndexData);
+      Set<String> fileIds = recordIndexLocations.stream()
+          .map(pair -> 
pair.getValue().getFileId()).collect(Collectors.toSet());
+      return fileSlices.stream().filter(fileSlice -> 
fileIds.contains(fileSlice.getFileId())).collect(Collectors.toList());
+    } finally {
+      // Clean up the RDD to avoid memory leaks
+      recordIndexData.unpersistWithDependencies();
+    }
+  }
+
+  public static Option<RecordLevelIndex> create(
+      String basePath,
+      Configuration conf,
+      HoodieTableMetaClient metaClient,
+      List<ExpressionEvaluators.Evaluator> evaluators,
+      RowType rowType) {
+    if (evaluators.isEmpty() || 
!FlinkOptions.QUERY_TYPE_SNAPSHOT.equalsIgnoreCase(conf.get(FlinkOptions.QUERY_TYPE)))
 {
+      return Option.empty();
+    }
+    if (metaClient == null) {
+      metaClient = StreamerUtil.createMetaClient(conf);
+    }
+    // disallow RLI for new encoding with complex key gen when the table 
version is lower than NINE.
+    if 
(KeyGenUtils.mayUseNewEncodingForComplexKeyGen(metaClient.getTableConfig())) {
+      return Option.empty();
+    }
+
+    String[] recordKeyFields = 
metaClient.getTableConfig().getRecordKeyFields().orElse(new String[0]);
+    if (recordKeyFields.length == 0) {
+      LOG.warn("The table do not have record keys, skipping the rli pruning.");
+      return Option.empty();
+    }
+    boolean consistentLogicalTimestampEnabled = 
OptionsResolver.isConsistentLogicalTimestampEnabled(conf);
+    List<String> hoodieKeysFromFilter = computeHoodieKeyFromFilters(conf, 
metaClient, evaluators, recordKeyFields, rowType, 
consistentLogicalTimestampEnabled);
+    if (hoodieKeysFromFilter.isEmpty()) {
+      LOG.warn("The number of keys from query predicate is empty, skipping the 
rli pruning.");
+      return Option.empty();
+    }
+    int maxKeyNum = conf.get(FlinkOptions.READ_DATA_SKIPPING_RLI_KEYS_MAX_NUM);
+    if (hoodieKeysFromFilter.size() > maxKeyNum) {
+      LOG.warn("The number of keys from query predicate: {} exceeds the upper 
threshold: {}, skipping the rli pruning, the keys: {}",
+          hoodieKeysFromFilter.size(), maxKeyNum, hoodieKeysFromFilter);
+      return Option.empty();
+    }
+    return Option.of(new RecordLevelIndex(basePath, conf, metaClient, 
hoodieKeysFromFilter));
+  }
+
+  /**
+   * Given query filters, it filters the EqualTo, IN and OR queries on record 
key columns and
+   * returns the list of record key literals present in the query, for example:
+   * <p>
+   * filter1: `key1` = 'val1', returns {"val1"}
+   * filter2: `key1` in ('val1', 'val2', 'val3'), returns {"val1", "vale", 
"val3"}
+   * filter3: `key1` = 'val1' OR `key1` = 'val2' or `key1` = 'val3', returns 
{"val1", "vale", "val3"}
+   * filter4: `key1` = 'val1' AND `key2` in ('val2', 'val3'), returns 
{"key1:val1,key2:val2", "key1:val1,key2:val3"}
+   */
+  @VisibleForTesting
+  public static List<String> computeHoodieKeyFromFilters(
+      Configuration conf,
+      HoodieTableMetaClient metaClient,
+      List<ExpressionEvaluators.Evaluator> evaluators,
+      String[] keyFields,
+      RowType rowType,
+      boolean consistentLogicalTimestampEnabled) {
+    String[] partitionFields = 
metaClient.getTableConfig().getPartitionFields().orElse(new String[0]);
+    // align with the check logic in RowDataKeyGen
+    boolean isComplexRecordKey = keyFields.length > 1 || 
partitionFields.length > 1 && 
!OptionsResolver.useComplexKeygenNewEncoding(conf);
+    List<String> hoodieKeys = new ArrayList<>();
+    List<String> fieldNames = rowType.getFieldNames();
+    for (String keyField: keyFields) {
+      List<String> recordKeys = new ArrayList<>();
+      LogicalType fieldType = rowType.getTypeAt(fieldNames.indexOf(keyField));
+      for (ExpressionEvaluators.Evaluator evaluator: evaluators) {
+        // if there exists multiple ref fields in an evaluator, ignore this 
evaluator, e.g., key = 'key1' or age = 20
+        List<Object> literals = collectLiterals(evaluator, keyField);
+        literals.forEach(val -> recordKeys.add(isComplexRecordKey
+            ? keyField + KeyGenerator.DEFAULT_COLUMN_VALUE_SEPARATOR + 
normalizeLiteral(val, keyField, fieldType, consistentLogicalTimestampEnabled)
+            : normalizeLiteral(val, keyField, fieldType, 
consistentLogicalTimestampEnabled)));
+      }
+      if (recordKeys.isEmpty()) {
+        LOG.info("No literals found for the record key: {}, therefore 
filtering can not be performed", keyField);
+        return Collections.emptyList();
+      } else if (!isComplexRecordKey || hoodieKeys.isEmpty()) {
+        hoodieKeys = recordKeys;
+      } else {
+        // Combine literals for this configured record key with literals for 
the other configured record keys
+        // If there are two literals for rk1, rk2, rk3 each. A total of 8 
combinations will be generated
+        List<String> tmpHoodieKeys = new ArrayList<>();
+        for (String compositeKey: hoodieKeys) {
+          for (String recordKey: recordKeys) {
+            tmpHoodieKeys.add(compositeKey + 
KeyGenerator.DEFAULT_RECORD_KEY_PARTS_SEPARATOR + recordKey);
+          }
+        }
+        hoodieKeys = tmpHoodieKeys;
+      }
+    }
+    return hoodieKeys;
+  }
+
+  /**
+   * Collect literal values for record key fields from the predicate.
+   */
+  private static List<Object> collectLiterals(ExpressionEvaluators.Evaluator 
evaluator, String refName) {
+    if (evaluator instanceof ExpressionEvaluators.LeafEvaluator
+        && !((ExpressionEvaluators.LeafEvaluator) 
evaluator).getName().equalsIgnoreCase(refName)) {
+      return Collections.emptyList();
+    }
+    if (evaluator instanceof ExpressionEvaluators.EqualTo) {
+      Object valueLiteral = ((ExpressionEvaluators.EqualTo) 
evaluator).getVal();
+      return valueLiteral == null ? Collections.emptyList() : 
Collections.singletonList(valueLiteral);
+    } else if (evaluator instanceof ExpressionEvaluators.In) {
+      Object[] valueLiterals = ((ExpressionEvaluators.In) evaluator).getVals();
+      if (valueLiterals.length < 1 || 
Arrays.stream(valueLiterals).anyMatch(Objects::isNull)) {
+        return Collections.emptyList();
+      }
+      return Arrays.stream(valueLiterals).collect(Collectors.toList());
+    } else if (evaluator instanceof ExpressionEvaluators.Or) {
+      List<List<Object>> literalsList = 
Arrays.stream(((ExpressionEvaluators.Or) evaluator).getEvaluators())
+          .map(eval -> collectLiterals(eval, 
refName)).collect(Collectors.toList());
+      // if any child expr do not contain predicate on the key, just return 
empty list
+      if (literalsList.stream().anyMatch(List::isEmpty)) {
+        return Collections.emptyList();
+      }
+      return 
literalsList.stream().flatMap(List::stream).distinct().collect(Collectors.toList());
+    } else {
+      return Collections.emptyList();
+    }
+  }
+
+  /**
+   * Normalize literal values before used to get record index locations.
+   */
+  private static String normalizeLiteral(Object value, String keyField, 
LogicalType fieldType, boolean consistentLogicalTimestampEnabled) {
+    switch (fieldType.getTypeRoot()) {
+      case DECIMAL:
+        // the scale of decimal data in predicate may not be aligned with that 
in record index, padding 0 if necessary,
+        // e.g., 1.11 with target scale 5, return 1.11000
+        BigDecimal decimal = (BigDecimal) value;
+        int targetScale = ((DecimalType) fieldType).getScale();
+        value = decimal.scale() >= targetScale ? value : 
decimal.setScale(targetScale);
+        break;
+      case TIMESTAMP_WITHOUT_TIME_ZONE:
+        // the original value is extracted from literal by 
ExpressionUtils#getValueFromLiteral, which is epoch millis
+        // convert it back to TimestampData before reusing key generating 
logic in RowDataKeyGen.
+        value = TimestampData.fromEpochMillis((Long) value);
+        break;
+      default:
+        break;
+    }
+    // to align with the hoodie key generating logic in writer side.
+    return RowDataKeyGen.getRecordKey(value, keyField, 
consistentLogicalTimestampEnabled);
+  }
+
+  @Override
+  public void close() {
+    if (this.metadataTable == null) {
+      return;
+    }
+    try {
+      this.metadataTable.close();
+    } catch (Exception e) {
+      throw new HoodieException("Exception happened during close metadata 
table.", e);
+    }
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index 37e5bbbfece0..d78fb9dd4856 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -391,42 +391,45 @@ public class HoodieTableSource implements
   }
 
   private List<MergeOnReadInputSplit> buildInputSplits() {
-    FileIndex fileIndex = getOrBuildFileIndex();
-    List<String> relPartitionPaths = fileIndex.getOrBuildPartitionPaths();
-    if (relPartitionPaths.isEmpty()) {
-      return Collections.emptyList();
-    }
-    List<StoragePathInfo> pathInfoList = fileIndex.getFilesInPartitions();
-    if (pathInfoList.isEmpty()) {
-      throw new HoodieException("No files found for reading in user provided 
path.");
-    }
-
-    String latestCommit;
-    List<FileSlice> allFileSlices;
-    // file-slice after pending compaction-requested instant-time is also 
considered valid
-    try (HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(
-        metaClient, 
metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants(),
 pathInfoList)) {
-      if (!fsView.getLastInstant().isPresent()) {
+    try (FileIndex fileIndex = getOrBuildFileIndex()) {
+      List<String> relPartitionPaths = fileIndex.getOrBuildPartitionPaths();
+      if (relPartitionPaths.isEmpty()) {
         return Collections.emptyList();
       }
-      latestCommit = fsView.getLastInstant().get().requestedTime();
-      allFileSlices = relPartitionPaths.stream()
-          .flatMap(par -> fsView.getLatestMergedFileSlicesBeforeOrOn(par, 
latestCommit)).collect(Collectors.toList());
+      List<StoragePathInfo> pathInfoList = fileIndex.getFilesInPartitions();
+      if (pathInfoList.isEmpty()) {
+        throw new HoodieException("No files found for reading in user provided 
path.");
+      }
+
+      String latestCommit;
+      List<FileSlice> allFileSlices;
+      // file-slice after pending compaction-requested instant-time is also 
considered valid
+      try (HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(
+          metaClient, 
metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants(),
 pathInfoList)) {
+        if (!fsView.getLastInstant().isPresent()) {
+          return Collections.emptyList();
+        }
+        latestCommit = fsView.getLastInstant().get().requestedTime();
+        allFileSlices = relPartitionPaths.stream()
+            .flatMap(par -> fsView.getLatestMergedFileSlicesBeforeOrOn(par, 
latestCommit)).collect(Collectors.toList());
+      }
+      List<FileSlice> fileSlices = fileIndex.filterFileSlices(allFileSlices);
+
+      final String mergeType = this.conf.get(FlinkOptions.MERGE_TYPE);
+      final AtomicInteger cnt = new AtomicInteger(0);
+      // generates one input split for each file group
+      return fileSlices.stream().map(fileSlice -> {
+        String basePath = 
fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null);
+        Option<List<String>> logPaths = 
Option.ofNullable(fileSlice.getLogFiles()
+            .sorted(HoodieLogFile.getLogFileComparator())
+            .map(logFile -> logFile.getPath().toString())
+            .collect(Collectors.toList()));
+        return new MergeOnReadInputSplit(cnt.getAndAdd(1), basePath, logPaths, 
latestCommit,
+            metaClient.getBasePath().toString(), maxCompactionMemoryInBytes, 
mergeType, null, fileSlice.getFileId());
+      }).collect(Collectors.toList());
+    } finally {
+      this.fileIndex = null;
     }
-    List<FileSlice> fileSlices = fileIndex.filterFileSlices(allFileSlices);
-
-    final String mergeType = this.conf.get(FlinkOptions.MERGE_TYPE);
-    final AtomicInteger cnt = new AtomicInteger(0);
-    // generates one input split for each file group
-    return fileSlices.stream().map(fileSlice -> {
-      String basePath = 
fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null);
-      Option<List<String>> logPaths = Option.ofNullable(fileSlice.getLogFiles()
-          .sorted(HoodieLogFile.getLogFileComparator())
-          .map(logFile -> logFile.getPath().toString())
-          .collect(Collectors.toList()));
-      return new MergeOnReadInputSplit(cnt.getAndAdd(1), basePath, logPaths, 
latestCommit,
-          metaClient.getBasePath().toString(), maxCompactionMemoryInBytes, 
mergeType, null, fileSlice.getFileId());
-    }).collect(Collectors.toList());
   }
 
   public InputFormat<RowData, ?> getInputFormat() {
@@ -683,19 +686,23 @@ public class HoodieTableSource implements
    */
   @VisibleForTesting
   public List<FileSlice> getBaseFileOnlyFileSlices() {
-    List<String> relPartitionPaths = getReadPartitions();
-    if (relPartitionPaths.isEmpty()) {
-      return Collections.emptyList();
-    }
-    List<StoragePathInfo> pathInfoList = fileIndex.getFilesInPartitions();
-    try (HoodieTableFileSystemView fsView = new 
HoodieTableFileSystemView(metaClient,
-        
metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants(),
 pathInfoList)) {
-
-      List<FileSlice> allFileSlices = relPartitionPaths.stream()
-          .flatMap(par -> fsView.getLatestBaseFiles(par)
-              .map(baseFile -> new FileSlice(new HoodieFileGroupId(par, 
baseFile.getFileId()), baseFile.getCommitTime(), baseFile, 
Collections.emptyList())))
-          .collect(Collectors.toList());
-      return fileIndex.filterFileSlices(allFileSlices);
+    try (FileIndex fileIndex = getOrBuildFileIndex())  {
+      List<String> relPartitionPaths = getReadPartitions();
+      if (relPartitionPaths.isEmpty()) {
+        return Collections.emptyList();
+      }
+      List<StoragePathInfo> pathInfoList = fileIndex.getFilesInPartitions();
+      try (HoodieTableFileSystemView fsView = new 
HoodieTableFileSystemView(metaClient,
+          
metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants(),
 pathInfoList)) {
+
+        List<FileSlice> allFileSlices = relPartitionPaths.stream()
+            .flatMap(par -> fsView.getLatestBaseFiles(par)
+                .map(baseFile -> new FileSlice(new HoodieFileGroupId(par, 
baseFile.getFileId()), baseFile.getCommitTime(), baseFile, 
Collections.emptyList())))
+            .collect(Collectors.toList());
+        return fileIndex.filterFileSlices(allFileSlices);
+      }
+    } finally {
+      this.fileIndex = null;
     }
   }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java
index fab10d7fd74c..ae7aaa022df3 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java
@@ -44,19 +44,30 @@ import 
org.apache.flink.table.expressions.FieldReferenceExpression;
 import org.apache.flink.table.expressions.ValueLiteralExpression;
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
 import org.apache.flink.table.functions.FunctionIdentifier;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.MethodSource;
 import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.File;
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneId;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static 
org.apache.hudi.configuration.FlinkOptions.HIVE_STYLE_PARTITIONING;
 import static org.apache.hudi.configuration.FlinkOptions.KEYGEN_CLASS_NAME;
@@ -217,6 +228,211 @@ public class TestFileIndex {
     assertEquals(Arrays.asList("par3"), p);
   }
 
+  @ParameterizedTest
+  @MethodSource("filtersAndResults")
+  void testFileListingWithRecordLevelIndex(String recordFields, 
ColumnStatsProbe probe, int maxKeyCnt, int expectedCnt) throws Exception {
+    DataType dataType = TestConfigurations.ROW_DATA_TYPE_WITH_ATOMIC_TYPES;
+    Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath(), dataType);
+    conf.set(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_COPY_ON_WRITE);
+    conf.set(FlinkOptions.METADATA_ENABLED, true);
+    conf.set(FlinkOptions.READ_DATA_SKIPPING_ENABLED, true);
+    conf.set(FlinkOptions.RECORD_KEY_FIELD, recordFields);
+    conf.set(FlinkOptions.READ_DATA_SKIPPING_RLI_KEYS_MAX_NUM, maxKeyCnt);
+    // Enable record level index specifically for this test
+    
conf.setString(HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_ENABLE_PROP.key(),
 "true");
+
+    // Write test data
+    TestData.writeData(TestData.DATA_SET_WITH_ATOMIC_TYPES, conf);
+
+    HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
+    // Create a filter on the record key 'uuid' with EQUALS operator to 
trigger record-level index
+    FileIndex fileIndex =
+        FileIndex.builder()
+            .path(new StoragePath(tempFile.getAbsolutePath()))
+            .conf(conf)
+            .rowType((RowType) dataType.getLogicalType())
+            .metaClient(metaClient)
+            .columnStatsProbe(probe)
+            .build();
+
+    // Get filtered file slices - this should use record-level index data 
skipping
+    List<FileSlice> fileSlices = getFilteredFileSlices(metaClient, fileIndex);
+    assertThat(fileSlices.size(), is(expectedCnt));
+  }
+
+  private static Stream<Arguments> filtersAndResults() {
+    CallExpression equalTinyInt = CallExpression.permanent(
+        BuiltInFunctionDefinitions.EQUALS,
+        Arrays.asList(
+            new FieldReferenceExpression("f_tinyint", DataTypes.TINYINT(), 0, 
0),
+            new ValueLiteralExpression((byte) 1, DataTypes.TINYINT().notNull())
+        ),
+        DataTypes.BOOLEAN());
+    CallExpression equalSmallInt = CallExpression.permanent(
+        BuiltInFunctionDefinitions.EQUALS,
+        Arrays.asList(
+            new FieldReferenceExpression("f_smallint", DataTypes.SMALLINT(), 
0, 0),
+            new ValueLiteralExpression((short) 11, 
DataTypes.SMALLINT().notNull())
+        ),
+        DataTypes.BOOLEAN());
+    CallExpression equalInt = CallExpression.permanent(
+        BuiltInFunctionDefinitions.EQUALS,
+        Arrays.asList(
+            new FieldReferenceExpression("f_int", DataTypes.INT(), 0, 0),
+            new ValueLiteralExpression(111, DataTypes.INT().notNull())
+        ),
+        DataTypes.BOOLEAN());
+    CallExpression equalBigInt = CallExpression.permanent(
+        BuiltInFunctionDefinitions.EQUALS,
+        Arrays.asList(
+            new FieldReferenceExpression("f_bigint", DataTypes.BIGINT(), 0, 0),
+            new ValueLiteralExpression(1111L, DataTypes.BIGINT().notNull())
+        ),
+        DataTypes.BOOLEAN());
+    CallExpression equalFloat = CallExpression.permanent(
+        BuiltInFunctionDefinitions.EQUALS,
+        Arrays.asList(
+            new FieldReferenceExpression("f_float", DataTypes.FLOAT(), 0, 0),
+            new ValueLiteralExpression(10.11f, DataTypes.FLOAT().notNull())
+        ),
+        DataTypes.BOOLEAN());
+    CallExpression equalDouble = CallExpression.permanent(
+        BuiltInFunctionDefinitions.EQUALS,
+        Arrays.asList(
+            new FieldReferenceExpression("f_double", DataTypes.DOUBLE(), 0, 0),
+            new ValueLiteralExpression(11.111, DataTypes.DOUBLE().notNull())
+        ),
+        DataTypes.BOOLEAN());
+
+    CallExpression equalExpr = CallExpression.permanent(
+        BuiltInFunctionDefinitions.EQUALS,
+        Arrays.asList(
+            new FieldReferenceExpression("f_str", DataTypes.STRING(), 0, 0),
+            new ValueLiteralExpression("str1", DataTypes.STRING().notNull())
+        ),
+        DataTypes.BOOLEAN());
+    CallExpression inExpr = CallExpression.permanent(
+        BuiltInFunctionDefinitions.IN,
+        Arrays.asList(
+            new FieldReferenceExpression("f_str", DataTypes.STRING(), 0, 0),
+            new ValueLiteralExpression("str2", DataTypes.STRING().notNull()),
+            new ValueLiteralExpression("str3", DataTypes.STRING().notNull())
+        ),
+        DataTypes.BOOLEAN());
+    CallExpression orExpression = 
CallExpression.permanent(BuiltInFunctionDefinitions.OR, Arrays.asList(inExpr, 
equalExpr), DataTypes.BOOLEAN());
+
+    CallExpression equalExpr1 = CallExpression.permanent(
+        BuiltInFunctionDefinitions.EQUALS,
+        Arrays.asList(
+            new FieldReferenceExpression("f_int", DataTypes.INT(), 0, 0),
+            new ValueLiteralExpression(111, DataTypes.INT().notNull())
+        ),
+        DataTypes.BOOLEAN());
+    CallExpression inExpr1 = CallExpression.permanent(
+        BuiltInFunctionDefinitions.IN,
+        Arrays.asList(
+            new FieldReferenceExpression("f_int", DataTypes.INT(), 0, 0),
+            new ValueLiteralExpression(333, DataTypes.INT().notNull()),
+            new ValueLiteralExpression(222, DataTypes.INT().notNull())
+        ),
+        DataTypes.BOOLEAN());
+
+    // record predicate with IN, number of filtered file slices is 1.
+    ColumnStatsProbe probe1 = 
ColumnStatsProbe.newInstance(Collections.singletonList(equalExpr));
+    // record predicate with EQUALS, number of filtered file slices is 2.
+    ColumnStatsProbe probe2 = 
ColumnStatsProbe.newInstance(Collections.singletonList(inExpr));
+    // record predicate with OR, number of filtered file slices is 3.
+    ColumnStatsProbe probe3 = 
ColumnStatsProbe.newInstance(Collections.singletonList(orExpression));
+
+    // predicate for two record keys
+    // id = id3 and name in ('Bob', 'Han'), number of filtered file slices is 
0.
+    ColumnStatsProbe probe4 = 
ColumnStatsProbe.newInstance(Arrays.asList(equalExpr, inExpr1));
+    // id = id3 and name = 'Julian', number of filtered file slices is 1.
+    ColumnStatsProbe probe5 = 
ColumnStatsProbe.newInstance(Arrays.asList(equalExpr, equalExpr1));
+    // id in (id1, id7) and name in ('Bob', 'Danny'), number of filtered file 
slices is 2.
+    ColumnStatsProbe probe6 = 
ColumnStatsProbe.newInstance(Arrays.asList(inExpr, inExpr1));
+
+    ColumnStatsProbe probeTinyInt = 
ColumnStatsProbe.newInstance(Collections.singletonList(equalTinyInt));
+    ColumnStatsProbe probeSmallInt = 
ColumnStatsProbe.newInstance(Collections.singletonList(equalSmallInt));
+    ColumnStatsProbe probeInt = 
ColumnStatsProbe.newInstance(Collections.singletonList(equalInt));
+    ColumnStatsProbe probeBigInt = 
ColumnStatsProbe.newInstance(Collections.singletonList(equalBigInt));
+    ColumnStatsProbe probeFloat = 
ColumnStatsProbe.newInstance(Collections.singletonList(equalFloat));
+    ColumnStatsProbe probeDouble = 
ColumnStatsProbe.newInstance(Collections.singletonList(equalDouble));
+
+    // TIMESTAMP data type tests - using the special data type config with 
f_timestamp as record key
+    CallExpression equalExprTimestamp = CallExpression.permanent(
+        BuiltInFunctionDefinitions.EQUALS,
+        Arrays.asList(
+            new FieldReferenceExpression("f_timestamp", 
DataTypes.TIMESTAMP(3), 0, 0),
+            new 
ValueLiteralExpression(LocalDateTime.ofInstant(Instant.ofEpochMilli(1), 
ZoneId.of("UTC")), DataTypes.TIMESTAMP(3).notNull())
+        ),
+        DataTypes.BOOLEAN());
+    ColumnStatsProbe probeTimestamp = 
ColumnStatsProbe.newInstance(Collections.singletonList(equalExprTimestamp));
+
+    // TIME data type tests - using the TIME config with appropriate record key
+    CallExpression equalExprTime = CallExpression.permanent(
+        BuiltInFunctionDefinitions.EQUALS,
+        Arrays.asList(
+            new FieldReferenceExpression("f_time", DataTypes.TIME(), 0, 0),
+            new ValueLiteralExpression(LocalTime.ofSecondOfDay(1), 
DataTypes.TIME().notNull())
+        ),
+        DataTypes.BOOLEAN());
+    ColumnStatsProbe probeTime = 
ColumnStatsProbe.newInstance(Collections.singletonList(equalExprTime));
+
+    // DATE data type tests - using the date config with appropriate record key
+    CallExpression equalExprDate = CallExpression.permanent(
+        BuiltInFunctionDefinitions.EQUALS,
+        Arrays.asList(
+            new FieldReferenceExpression("f_date", DataTypes.DATE(), 0, 0),
+            new ValueLiteralExpression(LocalDate.ofEpochDay(1), 
DataTypes.DATE().notNull())
+        ),
+        DataTypes.BOOLEAN());
+    ColumnStatsProbe probeDate = 
ColumnStatsProbe.newInstance(Collections.singletonList(equalExprDate));
+
+    // DECIMAL data type tests - using decimal ordering config
+    CallExpression equalExprDecimal = CallExpression.permanent(
+        BuiltInFunctionDefinitions.EQUALS,
+        Arrays.asList(
+            new FieldReferenceExpression("f_decimal", DataTypes.DECIMAL(38, 
18), 2, 2),
+            new ValueLiteralExpression(new BigDecimal("1.11"), 
DataTypes.DECIMAL(38, 18).notNull())
+        ),
+        DataTypes.BOOLEAN());
+    ColumnStatsProbe probeDecimal = 
ColumnStatsProbe.newInstance(Collections.singletonList(equalExprDecimal));
+
+    Object[][] data = new Object[][] {
+        {"f_str", probe1, 8, 1},
+        {"f_str", probe2, 8, 2},
+        {"f_str", probe3, 8, 3},
+        {"f_str,f_int", probe4, 8, 0},
+        {"f_str,f_int", probe5, 8, 1},
+        {"f_str,f_int", probe6, 8, 2},
+        // the number of hoodie keys inferred from query predicate is 2, which 
exceed the configured max
+        // number of hoodie keys for record index, thus fallback to not using 
record index.
+        {"f_str,f_int", probe2, 1, 3},
+        // key type is TINYINT
+        {"f_tinyint", probeTinyInt, 8, 1},
+        // key type is SMALLINT
+        {"f_smallint", probeSmallInt, 8, 1},
+        // key type is INT
+        {"f_int", probeInt, 8, 1},
+        // key type is BIGINT
+        {"f_bigint", probeBigInt, 8, 1},
+        // key type is FLOAT
+        {"f_float", probeFloat, 8, 1},
+        // key type is DOUBLE
+        {"f_double", probeDouble, 8, 1},
+        // key type is TIMESTAMP
+        {"f_timestamp", probeTimestamp, 8, 1},
+        // key type is TIME
+        {"f_time", probeTime, 8, 1},
+        // key type is DATE
+        {"f_date", probeDate, 8, 1},
+        // key type is DECIMAL
+        {"f_decimal", probeDecimal, 8, 1},
+    };
+    return Stream.of(data).map(Arguments::of);
+  }
+
   private List<FileSlice> getFilteredFileSlices(HoodieTableMetaClient 
metaClient, FileIndex fileIndex) {
     List<String> relPartitionPaths = fileIndex.getOrBuildPartitionPaths();
     List<StoragePathInfo> pathInfoList = fileIndex.getFilesInPartitions();
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java
index fb9275c2fc35..31343c8e3af2 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java
@@ -40,6 +40,7 @@ import org.apache.hudi.sink.partitioner.profile.WriteProfiles;
 import org.apache.hudi.source.prune.ColumnStatsProbe;
 import org.apache.hudi.source.prune.PartitionPruners;
 import org.apache.hudi.storage.StoragePathInfo;
+import org.apache.hudi.util.StreamerUtil;
 import org.apache.hudi.utils.TestConfigurations;
 import org.apache.hudi.utils.TestData;
 
@@ -366,8 +367,8 @@ public class TestIncrementalInputSplits extends 
HoodieCommonTestHarness {
       // which will be used to construct partition stats then.
       
conf.setString(HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key(), 
"true");
     }
-    metaClient = HoodieTestUtils.init(basePath, tableType);
     TestData.writeData(TestData.DATA_SET_INSERT, conf);
+    metaClient = StreamerUtil.createMetaClient(conf);
 
     // uuid > 'id5' and age < 30, only column stats of 'par3' matches the 
filter.
     ColumnStatsProbe columnStatsProbe =
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/stats/TestRecordLevelIndex.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/stats/TestRecordLevelIndex.java
new file mode 100644
index 000000000000..a23a5247139c
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/stats/TestRecordLevelIndex.java
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.stats;
+
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.keygen.KeyGenerator;
+import org.apache.hudi.sink.bulk.RowDataKeyGen;
+import org.apache.hudi.source.ExpressionEvaluators;
+import org.apache.hudi.utils.TestConfigurations;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.BuiltInFunctionDefinition;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.utils.TestConfigurations.ROW_DATA_TYPE_HOODIE_KEY_SPECIAL_DATA_TYPE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test cases for {@link RecordLevelIndex}.
+ */
+@ExtendWith(MockitoExtension.class)
+public class TestRecordLevelIndex {
+  private final HoodieTableMetaClient metaClient = 
mock(HoodieTableMetaClient.class);
+  private final HoodieTableConfig tableConfig = mock(HoodieTableConfig.class);
+
+  private static final DataType ROW_DATA_TYPE_MULTI_KEYS = DataTypes.ROW(
+          DataTypes.FIELD("key1", DataTypes.VARCHAR(20)),// record key
+          DataTypes.FIELD("key2", DataTypes.VARCHAR(20)),// record key
+          DataTypes.FIELD("name", DataTypes.VARCHAR(10)),
+          DataTypes.FIELD("age", DataTypes.INT()),
+          DataTypes.FIELD("ts", DataTypes.TIMESTAMP(3)), // precombine field
+          DataTypes.FIELD("partition", DataTypes.VARCHAR(10)))
+      .notNull();
+  private static final RowType ROW_TYPE_MULTI_KEYS = (RowType) 
ROW_DATA_TYPE_MULTI_KEYS.getLogicalType();
+
+  private List<ExpressionEvaluators.Evaluator> 
createColumnStatsProbe(BuiltInFunctionDefinition func, String refName, 
List<String> vals) {
+    List<ResolvedExpression> args = vals.stream().map(
+        val -> new ValueLiteralExpression(val, 
DataTypes.STRING().notNull())).collect(Collectors.toList());
+    args.add(0, new FieldReferenceExpression(refName, DataTypes.STRING(), 0, 
0));
+    CallExpression callExpression = CallExpression.permanent(func, args, 
DataTypes.BOOLEAN());
+    return 
ExpressionEvaluators.fromExpression(Collections.singletonList(callExpression));
+  }
+
+  private List<ExpressionEvaluators.Evaluator> createOrColumnStatsProbe(String 
refName, List<String> vals) {
+    // Create multiple EQUALS expressions and join them with OR
+    CallExpression outerExpr = CallExpression.permanent(
+        BuiltInFunctionDefinitions.EQUALS,
+        Arrays.asList(new FieldReferenceExpression(refName, 
DataTypes.STRING(), 0, 0),
+            new ValueLiteralExpression(vals.get(0), 
DataTypes.STRING().notNull())),
+        DataTypes.BOOLEAN());
+    CallExpression leftExpr = CallExpression.permanent(
+        BuiltInFunctionDefinitions.EQUALS,
+        Arrays.asList(new FieldReferenceExpression(refName, 
DataTypes.STRING(), 0, 0),
+            new ValueLiteralExpression(vals.get(1), 
DataTypes.STRING().notNull())),
+        DataTypes.BOOLEAN());
+    CallExpression rightExpr = CallExpression.permanent(
+        BuiltInFunctionDefinitions.EQUALS,
+        Arrays.asList(new FieldReferenceExpression(refName, 
DataTypes.STRING(), 0, 0),
+            new ValueLiteralExpression(vals.get(2), 
DataTypes.STRING().notNull())),
+        DataTypes.BOOLEAN());
+
+    CallExpression orExpression = CallExpression.permanent(
+        BuiltInFunctionDefinitions.OR,
+        Arrays.asList(leftExpr, rightExpr),
+        DataTypes.BOOLEAN());
+    CallExpression outerOrExpression = CallExpression.permanent(
+        BuiltInFunctionDefinitions.OR,
+        Arrays.asList(outerExpr, orExpression),
+        DataTypes.BOOLEAN());
+    return 
ExpressionEvaluators.fromExpression(Collections.singletonList(outerOrExpression));
+  }
+
+  @Test
+  public void testComputeHoodieKeyFromFiltersWithSimpleRecordKey() {
+    // Setup mock table config with single record key field
+    when(metaClient.getTableConfig()).thenReturn(tableConfig);
+    when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new 
String[]{"uuid"}));
+    when(tableConfig.getPartitionFields()).thenReturn(Option.of(new 
String[]{"partition"}));
+
+    Configuration conf = new Configuration();
+
+    // Test with EqualTo evaluator on record key
+    List<ExpressionEvaluators.Evaluator> evaluators = createColumnStatsProbe(
+        BuiltInFunctionDefinitions.EQUALS, "uuid", 
Collections.singletonList("id1"));
+    String[] recordKeyFields = {"uuid"};
+    List<String> result = RecordLevelIndex.computeHoodieKeyFromFilters(
+        conf, metaClient, evaluators, recordKeyFields, 
TestConfigurations.ROW_TYPE, false);
+    assertEquals(Collections.singletonList("id1"), result, "Should return the 
simple record key value");
+  }
+
+  @Test
+  public void testComputeHoodieKeyFromFiltersWithInOperator() {
+    // Setup mock table config with single record key field
+    when(metaClient.getTableConfig()).thenReturn(tableConfig);
+    when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new 
String[]{"uuid"}));
+    when(tableConfig.getPartitionFields()).thenReturn(Option.of(new 
String[]{"partition"}));
+
+    Configuration conf = new Configuration();
+
+    // Test with IN operator
+    List<ExpressionEvaluators.Evaluator> evaluators = 
createColumnStatsProbe(BuiltInFunctionDefinitions.IN, "uuid", 
Arrays.asList("id1", "id2", "id3"));
+    String[] recordKeyFields = {"uuid"};
+    List<String> result = RecordLevelIndex.computeHoodieKeyFromFilters(
+        conf, metaClient, evaluators, recordKeyFields, 
TestConfigurations.ROW_TYPE, false);
+    assertEquals(Arrays.asList("id1", "id2", "id3"), result, "Should return 
all the IN operator values");
+  }
+
+  @Test
+  public void testComputeHoodieKeyFromFiltersWithOrOperator() {
+    // Setup mock table config with single record key field
+    when(metaClient.getTableConfig()).thenReturn(tableConfig);
+    when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new 
String[]{"uuid"}));
+    when(tableConfig.getPartitionFields()).thenReturn(Option.of(new 
String[]{"partition"}));
+
+    Configuration conf = new Configuration();
+
+    // Test with OR operator (which should be converted to IN)
+    List<ExpressionEvaluators.Evaluator> evaluators = 
createOrColumnStatsProbe("uuid", Arrays.asList("id1", "id2", "id3"));
+    String[] recordKeyFields = {"uuid"};
+    List<String> result = RecordLevelIndex.computeHoodieKeyFromFilters(
+        conf, metaClient, evaluators, recordKeyFields, 
TestConfigurations.ROW_TYPE, false);
+    // Note: OR with two values "id1" and "id2" should result in the literals 
from both evaluators
+    assertEquals(Arrays.asList("id1", "id2", "id3"), result, "Should return 
values from OR operator");
+  }
+
+  @Test
+  public void testComputeHoodieKeyFromFiltersWithComplexRecordKey() {
+    // Setup mock table config with multiple record key fields
+    when(metaClient.getTableConfig()).thenReturn(tableConfig);
+    when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new 
String[]{"key1", "key2"}));
+    when(tableConfig.getPartitionFields()).thenReturn(Option.of(new 
String[]{"partition"}));
+
+    Configuration conf = new Configuration();
+
+    // Test with filters on multiple record key fields
+    List<ResolvedExpression> expressions = Arrays.asList(
+        CallExpression.permanent(
+            BuiltInFunctionDefinitions.EQUALS,
+            Arrays.asList(new FieldReferenceExpression("key1", 
DataTypes.STRING(), 0, 0),
+                new ValueLiteralExpression("val1", 
DataTypes.STRING().notNull())),
+            DataTypes.BOOLEAN()),
+        CallExpression.permanent(
+            BuiltInFunctionDefinitions.EQUALS,
+            Arrays.asList(new FieldReferenceExpression("key2", 
DataTypes.STRING(), 0, 0),
+                new ValueLiteralExpression("val2", 
DataTypes.STRING().notNull())),
+            DataTypes.BOOLEAN())
+    );
+    String[] recordKeyFields = {"key1", "key2"};
+    List<String> result = RecordLevelIndex.computeHoodieKeyFromFilters(
+        conf, metaClient, ExpressionEvaluators.fromExpression(expressions), 
recordKeyFields, ROW_TYPE_MULTI_KEYS, false);
+    // For complex keys, the format should be key1:val1,key2:val2
+    assertEquals(Arrays.asList("key1:val1" + 
KeyGenerator.DEFAULT_RECORD_KEY_PARTS_SEPARATOR + "key2:val2"), result,
+        "Should return composite key with complex record keys");
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testComputeHoodieKeyFromFiltersWithSpecialTypeRecordKey(boolean 
consistentLogicalTimestampEnabled) {
+    // Setup mock table config with multiple record key fields
+    when(metaClient.getTableConfig()).thenReturn(tableConfig);
+    when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new 
String[]{"f_timestamp", "f_decimal"}));
+    when(tableConfig.getPartitionFields()).thenReturn(Option.of(new 
String[]{"partition"}));
+
+    Configuration conf = new Configuration();
+    // Test with filters on multiple record key fields
+    List<ResolvedExpression> expressions = Arrays.asList(
+        CallExpression.permanent(
+            BuiltInFunctionDefinitions.EQUALS,
+            Arrays.asList(
+                new FieldReferenceExpression("f_timestamp", 
DataTypes.TIMESTAMP(3), 0, 0),
+                new 
ValueLiteralExpression(LocalDateTime.ofInstant(Instant.ofEpochMilli(1), 
ZoneId.of("UTC")), DataTypes.TIMESTAMP(3).notNull())
+            ),
+            DataTypes.BOOLEAN()),
+        CallExpression.permanent(
+            BuiltInFunctionDefinitions.EQUALS,
+            Arrays.asList(
+                new FieldReferenceExpression("f_decimal", DataTypes.DECIMAL(3, 
2), 0, 2),
+                new ValueLiteralExpression(new BigDecimal("1.1"), 
DataTypes.DECIMAL(3, 2).notNull())
+            ),
+            DataTypes.BOOLEAN())
+    );
+    String[] recordKeyFields = {"f_timestamp", "f_decimal"};
+    RowType rowType = (RowType) 
ROW_DATA_TYPE_HOODIE_KEY_SPECIAL_DATA_TYPE.getLogicalType();
+    List<String> result = RecordLevelIndex.computeHoodieKeyFromFilters(
+        conf, metaClient, ExpressionEvaluators.fromExpression(expressions), 
recordKeyFields, rowType, consistentLogicalTimestampEnabled);
+    String expectedTimestampVal = 
RowDataKeyGen.getRecordKey(TimestampData.fromEpochMillis(1), "f_timestamp", 
consistentLogicalTimestampEnabled);
+    assertEquals(Arrays.asList("f_timestamp:" + expectedTimestampVal + 
KeyGenerator.DEFAULT_RECORD_KEY_PARTS_SEPARATOR + "f_decimal:1.10"), result,
+        "Should return composite key with complex record keys");
+  }
+
+  @Test
+  public void testComputeHoodieKeyFromFiltersWithMultipleSimpleKeys() {
+    // Setup mock table config with single record key field
+    when(metaClient.getTableConfig()).thenReturn(tableConfig);
+    when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new 
String[]{"uuid"}));
+    when(tableConfig.getPartitionFields()).thenReturn(Option.of(new 
String[]{"partition"}));
+
+    Configuration conf = new Configuration();
+
+    // Test with multiple equal filters on same record key field
+    // This scenario might not be typical, but testing the method behavior
+    List<ResolvedExpression> expressions = Arrays.asList(
+        CallExpression.permanent(
+            BuiltInFunctionDefinitions.EQUALS,
+            Arrays.asList(new FieldReferenceExpression("uuid", 
DataTypes.STRING(), 0, 0),
+                new ValueLiteralExpression("id1", 
DataTypes.STRING().notNull())),
+            DataTypes.BOOLEAN()),
+        CallExpression.permanent(
+            BuiltInFunctionDefinitions.EQUALS,
+            Arrays.asList(new FieldReferenceExpression("uuid", 
DataTypes.STRING(), 0, 0),
+                new ValueLiteralExpression("id2", 
DataTypes.STRING().notNull())),
+            DataTypes.BOOLEAN())
+    );
+    String[] recordKeyFields = {"uuid"};
+    List<String> result = RecordLevelIndex.computeHoodieKeyFromFilters(
+        conf, metaClient, ExpressionEvaluators.fromExpression(expressions), 
recordKeyFields, TestConfigurations.ROW_TYPE, false);
+    // This should return both values
+    assertEquals(Arrays.asList("id1", "id2"), result, "Should return multiple 
values for same field");
+  }
+
+  @Test
+  public void testComputeHoodieKeyFromFiltersWithEmptyResult() {
+    // Setup mock table config with single record key field
+    when(metaClient.getTableConfig()).thenReturn(tableConfig);
+    when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new 
String[]{"uuid"}));
+    when(tableConfig.getPartitionFields()).thenReturn(Option.of(new 
String[]{"partition"}));
+
+    Configuration conf = new Configuration();
+
+    // Test with filter on non-record key field - should return empty list
+    List<ExpressionEvaluators.Evaluator> evaluators = createColumnStatsProbe(
+        BuiltInFunctionDefinitions.EQUALS, "nonKeyField", 
Collections.singletonList("val1"));
+    String[] recordKeyFields = {"uuid"};
+    List<String> result = RecordLevelIndex.computeHoodieKeyFromFilters(
+        conf, metaClient, evaluators, recordKeyFields, 
TestConfigurations.ROW_TYPE, false);
+    assertEquals(Collections.emptyList(), result, "Should return empty list 
when filtering on non-record key field");
+
+    CallExpression keyExpr = CallExpression.permanent(
+        BuiltInFunctionDefinitions.EQUALS,
+        Arrays.asList(new FieldReferenceExpression("uuid", DataTypes.STRING(), 
0, 0),
+            new ValueLiteralExpression("key1", DataTypes.STRING().notNull())),
+        DataTypes.BOOLEAN());
+    CallExpression nonKeyExpr = CallExpression.permanent(
+        BuiltInFunctionDefinitions.EQUALS,
+        Arrays.asList(new FieldReferenceExpression("name", DataTypes.STRING(), 
0, 0),
+            new ValueLiteralExpression("Bob", DataTypes.STRING().notNull())),
+        DataTypes.BOOLEAN());
+    CallExpression orExpression = CallExpression.permanent(
+        BuiltInFunctionDefinitions.OR,
+        Arrays.asList(keyExpr, nonKeyExpr),
+        DataTypes.BOOLEAN());
+
+    evaluators = 
ExpressionEvaluators.fromExpression(Collections.singletonList(orExpression));
+    result = RecordLevelIndex.computeHoodieKeyFromFilters(
+        conf, metaClient, evaluators, recordKeyFields, 
TestConfigurations.ROW_TYPE, false);
+    assertEquals(Collections.emptyList(), result, "Should return empty list 
when filtering on or predicate including multiple fields");
+  }
+
+  @Test
+  public void 
testComputeHoodieKeyFromFiltersWithComplexRecordKeyMultipleValues() {
+    // Setup mock table config with multiple record key fields
+    when(metaClient.getTableConfig()).thenReturn(tableConfig);
+    when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new 
String[]{"key1", "key2"}));
+    when(tableConfig.getPartitionFields()).thenReturn(Option.of(new 
String[]{"partition"}));
+
+    Configuration conf = new Configuration();
+
+    // Test with multiple values for each record key field to test combinations
+    // key1 in ['val1', 'val2'] AND key2 in ['val3', 'val4'] should produce 4 
combinations
+    List<ResolvedExpression> expressions = Arrays.asList(
+        CallExpression.permanent(
+            BuiltInFunctionDefinitions.IN,
+            Arrays.asList(new FieldReferenceExpression("key1", 
DataTypes.STRING(), 0, 0),
+                new ValueLiteralExpression("val1", 
DataTypes.STRING().notNull()),
+                new ValueLiteralExpression("val2", 
DataTypes.STRING().notNull())),
+            DataTypes.BOOLEAN()),
+        CallExpression.permanent(
+            BuiltInFunctionDefinitions.IN,
+            Arrays.asList(new FieldReferenceExpression("key2", 
DataTypes.STRING(), 0, 0),
+                new ValueLiteralExpression("val3", 
DataTypes.STRING().notNull()),
+                new ValueLiteralExpression("val4", 
DataTypes.STRING().notNull())),
+            DataTypes.BOOLEAN())
+    );
+    String[] recordKeyFields = {"key1", "key2"};
+    List<String> result = RecordLevelIndex.computeHoodieKeyFromFilters(
+        conf, metaClient, ExpressionEvaluators.fromExpression(expressions), 
recordKeyFields, ROW_TYPE_MULTI_KEYS, false);
+    // Should have 4 combinations: (val1,val3), (val1,val4), (val2,val3), 
(val2,val4)
+    List<String> expected = Arrays.asList(
+        "key1:val1" + KeyGenerator.DEFAULT_RECORD_KEY_PARTS_SEPARATOR + 
"key2:val3",
+        "key1:val1" + KeyGenerator.DEFAULT_RECORD_KEY_PARTS_SEPARATOR + 
"key2:val4",
+        "key1:val2" + KeyGenerator.DEFAULT_RECORD_KEY_PARTS_SEPARATOR + 
"key2:val3",
+        "key1:val2" + KeyGenerator.DEFAULT_RECORD_KEY_PARTS_SEPARATOR + 
"key2:val4"
+    );
+    assertEquals(expected, result, "Should return all combinations of complex 
record keys");
+  }
+}
\ No newline at end of file
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 8aaaf3f58711..c12dd314fe06 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -526,6 +526,36 @@ public class ITTestHoodieDataSource {
     assertRowsEquals(result, expected, true);
   }
 
+  @Test
+  void testDataSkippingWithRecordLevelIndex() throws Exception {
+    TableEnvironment tableEnv = batchTableEnv;
+    String hoodieTableDDL = sql("t1")
+        .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .option(FlinkOptions.METADATA_ENABLED, true)
+        .option(FlinkOptions.READ_DATA_SKIPPING_ENABLED, true)
+        
.option(HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_ENABLE_PROP.key(), true)
+        .option(FlinkOptions.TABLE_TYPE, COPY_ON_WRITE)
+        .end();
+    tableEnv.executeSql(hoodieTableDDL);
+    execInsertSql(tableEnv, TestSQL.INSERT_T1);
+
+    List<Row> result1 = CollectionUtil.iterableToList(
+        () -> tableEnv.sqlQuery("select * from t1 where uuid = 
'id1'").execute().collect());
+    assertRowsEquals(result1, "[+I[id1, Danny, 23, 1970-01-01T00:00:01, 
par1]]");
+    // apply filters
+    List<Row> result2 = CollectionUtil.iterableToList(
+        () -> tableEnv.sqlQuery("select * from t1 where uuid in ('id7', 
'id8')").execute().collect());
+    assertRowsEquals(result2, "["
+        + "+I[id7, Bob, 44, 1970-01-01T00:00:07, par4], "
+        + "+I[id8, Han, 56, 1970-01-01T00:00:08, par4]]");
+    List<Row> result3 = CollectionUtil.iterableToList(
+        () -> tableEnv.sqlQuery("select * from t1 where uuid = 'id1' or uuid = 
'id7' or uuid = 'id8'").execute().collect());
+    assertRowsEquals(result3, "["
+        + "+I[id1, Danny, 23, 1970-01-01T00:00:01, par1], "
+        + "+I[id7, Bob, 44, 1970-01-01T00:00:07, par4], "
+        + "+I[id8, Han, 56, 1970-01-01T00:00:08, par4]]");
+  }
+
   @ParameterizedTest
   @MethodSource("tableTypeAndBooleanTrueFalseParams")
   void testReadWithPartitionStatsPruning(HoodieTableType tableType, boolean 
hiveStylePartitioning) throws Exception {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
index e70b9f31863f..3e307d08d624 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
@@ -304,7 +304,7 @@ public class TestHoodieTableSource {
 
     // test timestamp filtering
     
TestData.writeDataAsBatch(TestData.DATA_SET_INSERT_HOODIE_KEY_SPECIAL_DATA_TYPE,
 conf1);
-    HoodieTableSource tableSource1 = createHoodieTableSource(conf1);
+    HoodieTableSource tableSource1 = createHoodieTableSource(conf1, 
TestConfigurations.TABLE_SCHEMA_KEY_SPECIAL_DATA_TYPE);
     tableSource1.applyFilters(Collections.singletonList(
         createLitEquivalenceExpr(f1, 0, DataTypes.TIMESTAMP(3).notNull(),
             LocalDateTime.ofInstant(Instant.ofEpochMilli(1), 
ZoneId.of("UTC")))));
@@ -321,7 +321,7 @@ public class TestHoodieTableSource {
     conf2.set(FlinkOptions.RECORD_KEY_FIELD, f2);
     conf2.set(FlinkOptions.ORDERING_FIELDS, f2);
     
TestData.writeDataAsBatch(TestData.DATA_SET_INSERT_HOODIE_KEY_SPECIAL_DATA_TYPE,
 conf2);
-    HoodieTableSource tableSource2 = createHoodieTableSource(conf2);
+    HoodieTableSource tableSource2 = createHoodieTableSource(conf2, 
TestConfigurations.TABLE_SCHEMA_KEY_SPECIAL_DATA_TYPE);
     tableSource2.applyFilters(Collections.singletonList(
         createLitEquivalenceExpr(f2, 1, DataTypes.DATE().notNull(), 
LocalDate.ofEpochDay(1))));
 
@@ -337,7 +337,8 @@ public class TestHoodieTableSource {
     conf3.set(FlinkOptions.RECORD_KEY_FIELD, f3);
     conf3.set(FlinkOptions.ORDERING_FIELDS, f3);
     
TestData.writeDataAsBatch(TestData.DATA_SET_INSERT_HOODIE_KEY_SPECIAL_DATA_TYPE,
 conf3);
-    HoodieTableSource tableSource3 = createHoodieTableSource(conf3);
+    HoodieTableSource tableSource3 = createHoodieTableSource(conf3, 
TestConfigurations.TABLE_SCHEMA_KEY_SPECIAL_DATA_TYPE);
+
     tableSource3.applyFilters(Collections.singletonList(
         createLitEquivalenceExpr(f3, 1, DataTypes.DECIMAL(3, 2).notNull(),
             new BigDecimal("1.11"))));
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
index 38fd4fd4a756..0ae06bc91af5 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
@@ -184,6 +184,28 @@ public class TestConfigurations {
 
   public static final RowType ROW_TYPE_HOODIE_KEY_SPECIAL_DATA_TYPE = 
(RowType) ROW_DATA_TYPE_HOODIE_KEY_SPECIAL_DATA_TYPE.getLogicalType();
 
+  public static final ResolvedSchema TABLE_SCHEMA_KEY_SPECIAL_DATA_TYPE  = 
SchemaBuilder.instance()
+      .fields(ROW_TYPE_HOODIE_KEY_SPECIAL_DATA_TYPE.getFieldNames(), 
ROW_DATA_TYPE_HOODIE_KEY_SPECIAL_DATA_TYPE.getChildren())
+      .build();
+
+  public static final DataType ROW_DATA_TYPE_WITH_ATOMIC_TYPES = DataTypes.ROW(
+          DataTypes.FIELD("f_bool", DataTypes.BOOLEAN()),
+          DataTypes.FIELD("f_tinyint", DataTypes.TINYINT()),
+          DataTypes.FIELD("f_smallint", DataTypes.SMALLINT()),
+          DataTypes.FIELD("f_int", DataTypes.INT()),
+          DataTypes.FIELD("f_bigint", DataTypes.BIGINT()),
+          DataTypes.FIELD("f_float", DataTypes.FLOAT()),
+          DataTypes.FIELD("f_double", DataTypes.DOUBLE()),
+          DataTypes.FIELD("f_timestamp", DataTypes.TIMESTAMP(3)),
+          DataTypes.FIELD("f_time", DataTypes.TIME()),
+          DataTypes.FIELD("f_date", DataTypes.DATE()),
+          DataTypes.FIELD("f_decimal", DataTypes.DECIMAL(38, 18)),
+          DataTypes.FIELD("f_str", DataTypes.STRING()),
+          DataTypes.FIELD("partition", DataTypes.VARCHAR(10)))
+      .notNull();
+
+  public static final RowType ROW_TYPE_WITH_ATOMIC_TYPES = (RowType) 
ROW_DATA_TYPE_WITH_ATOMIC_TYPES.getLogicalType();
+
   public static final RowType ROW_TYPE_EVOLUTION_AFTER = (RowType) 
ROW_DATA_TYPE_EVOLUTION_AFTER.getLogicalType();
 
   public static final DataType ROW_DATA_TYPE_BIGINT = DataTypes.ROW(
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index 3ed961b8d5bf..9020073f2a37 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -426,6 +426,14 @@ public class TestData {
           TimestampData.fromEpochMillis(2), StringData.fromString("par1"))
   );
 
+  public static List<RowData> DATA_SET_WITH_ATOMIC_TYPES = Arrays.asList(
+      insertRow(TestConfigurations.ROW_TYPE_WITH_ATOMIC_TYPES, true, (byte) 1, 
(short) 11, 111, 1111L, 10.11f, 11.111, TimestampData.fromEpochMillis(1),
+          1000, 1, DecimalData.fromBigDecimal(new BigDecimal("1.11"), 38, 18), 
StringData.fromString("str1"), StringData.fromString("par1")),
+      insertRow(TestConfigurations.ROW_TYPE_WITH_ATOMIC_TYPES, true, (byte) 2, 
(short) 22, 222, 2222L, 20.22f, 22.222, TimestampData.fromEpochMillis(2),
+          2000, 2, DecimalData.fromBigDecimal(new BigDecimal("2.22"), 38, 18), 
StringData.fromString("str2"), StringData.fromString("par2")),
+      insertRow(TestConfigurations.ROW_TYPE_WITH_ATOMIC_TYPES, true, (byte) 3, 
(short) 33, 333, 3333L, 30.33f, 33.333, TimestampData.fromEpochMillis(3),
+          3000, 3, DecimalData.fromBigDecimal(new BigDecimal("3.33"), 38, 18), 
StringData.fromString("str3"), StringData.fromString("par3")));
+
   // data types handled specifically for Hoodie Key
   public static List<RowData> DATA_SET_INSERT_HOODIE_KEY_SPECIAL_DATA_TYPE = 
new ArrayList<>();
 

Reply via email to