This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 3847b44b0d81 [HUDI-9656] Rename index lookup components (#13647)
3847b44b0d81 is described below

commit 3847b44b0d81585a862bcd4168370890f9f59186
Author: Davis-Zhang-Onehouse 
<[email protected]>
AuthorDate: Tue Jul 29 19:45:11 2025 -0700

    [HUDI-9656] Rename index lookup components (#13647)
---
 .../hudi/client/TestJavaHoodieBackedMetadata.java  |  21 +-
 .../client/utils/SparkMetadataWriterUtils.java     |   4 +-
 .../hudi/index/SparkMetadataTableRecordIndex.java  |   2 +-
 .../hudi/common/table/view/NoOpTableMetadata.java  |  14 +-
 .../apache/hudi/metadata/BaseTableMetadata.java    |  77 +++---
 .../hudi/metadata/BloomFilterIndexRawKey.java      |  75 ++++++
 .../metadata/ColumnStatsIndexPrefixRawKey.java     |  95 ++++++++
 .../hudi/metadata/ColumnStatsIndexRawKey.java      |  87 +++++++
 .../metadata/FileSystemBackedTableMetadata.java    |  13 +-
 .../org/apache/hudi/metadata/FilesIndexRawKey.java |  66 ++++++
 .../hudi/metadata/HoodieBackedTableMetadata.java   | 263 +++++++--------------
 .../apache/hudi/metadata/HoodieTableMetadata.java  |  28 +--
 .../hudi/metadata/HoodieTableMetadataUtil.java     |  30 ++-
 .../main/java/org/apache/hudi/metadata/RawKey.java |  33 +++
 .../apache/hudi/metadata/RecordIndexRawKey.java    |  65 +++++
 .../hudi/metadata/SecondaryIndexKeyUtils.java      |  63 ++---
 .../hudi/metadata/SecondaryIndexPrefixRawKey.java  |  64 +++++
 .../TestHoodieBackedTableMetadataDataCleanup.java  |  20 +-
 .../hudi/metadata/TestHoodieTableMetadataUtil.java |  33 +--
 .../hudi/metadata/TestSecondaryIndexKeyUtils.java  |   5 +-
 .../apache/hudi/source/stats/FileStatsIndex.java   |  13 +-
 .../hudi/metadata/TestHoodieTableMetadataUtil.java |  12 +-
 .../org/apache/hudi/ColumnStatsIndexSupport.scala  |  22 +-
 .../org/apache/hudi/ExpressionIndexSupport.scala   |  54 ++---
 .../apache/hudi/PartitionStatsIndexSupport.scala   |  10 +-
 .../org/apache/hudi/RecordLevelIndexSupport.scala  |   2 +-
 .../org/apache/hudi/SecondaryIndexSupport.scala    |   2 +-
 .../hudi/functional/TestHoodieBackedMetadata.java  |  48 ++--
 .../hudi/functional/RecordLevelIndexTestBase.scala |   4 +-
 .../hudi/functional/TestMetadataRecordIndex.scala  |   2 +-
 .../hudi/feature/index/TestSecondaryIndex.scala    |   3 +-
 .../testHoodieBackedTableMetadataIndexLookup.scala |  26 +-
 .../utilities/HoodieMetadataTableValidator.java    |   2 +-
 33 files changed, 825 insertions(+), 433 deletions(-)

diff --git 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
index f74b92d36e4b..4f23b94ba6a9 100644
--- 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
+++ 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
@@ -78,8 +78,8 @@ import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.JsonUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ClosableIterator;
-import org.apache.hudi.common.util.hash.ColumnIndexID;
 import org.apache.hudi.common.util.hash.PartitionIndexID;
+import org.apache.hudi.metadata.ColumnStatsIndexPrefixRawKey;
 import org.apache.hudi.config.HoodieArchivalConfig;
 import org.apache.hudi.config.HoodieCleanConfig;
 import org.apache.hudi.config.HoodieClusteringConfig;
@@ -164,7 +164,6 @@ import static 
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_FILE_NAME
 import static 
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
 import static 
org.apache.hudi.config.HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS;
 import static 
org.apache.hudi.metadata.HoodieTableMetadata.getMetadataTableBasePath;
-import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.IDENTITY_ENCODING;
 import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.deleteMetadataTable;
 import static org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS;
 import static org.apache.hudi.metadata.MetadataPartitionType.FILES;
@@ -1472,10 +1471,9 @@ public class TestJavaHoodieBackedMetadata extends 
TestHoodieMetadataBase {
 
       HoodieTableMetadata tableMetadata = metadata(client);
       // prefix search for column (_hoodie_record_key)
-      ColumnIndexID columnIndexID = new 
ColumnIndexID(HoodieRecord.RECORD_KEY_METADATA_FIELD);
       List<HoodieRecord<HoodieMetadataPayload>> result = 
tableMetadata.getRecordsByKeyPrefixes(
-          
HoodieListData.lazy(Collections.singletonList(columnIndexID.asBase64EncodedString())),
-          MetadataPartitionType.COLUMN_STATS.getPartitionPath(), true, 
IDENTITY_ENCODING).collectAsList();
+          HoodieListData.lazy(Collections.singletonList(new 
ColumnStatsIndexPrefixRawKey(HoodieRecord.RECORD_KEY_METADATA_FIELD))),
+          MetadataPartitionType.COLUMN_STATS.getPartitionPath(), 
true).collectAsList();
 
       // there are 3 partitions in total and 2 commits. total entries should 
be 6.
       assertEquals(result.size(), 6);
@@ -1486,8 +1484,10 @@ public class TestJavaHoodieBackedMetadata extends 
TestHoodieMetadataBase {
       // prefix search for col(_hoodie_record_key) and first partition. only 2 
files should be matched
       PartitionIndexID partitionIndexID = new 
PartitionIndexID(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);
       result = tableMetadata.getRecordsByKeyPrefixes(
-          
HoodieListData.lazy(Collections.singletonList(columnIndexID.asBase64EncodedString().concat(partitionIndexID.asBase64EncodedString()))),
-          MetadataPartitionType.COLUMN_STATS.getPartitionPath(), true, 
IDENTITY_ENCODING).collectAsList();
+          HoodieListData.lazy(HoodieTableMetadataUtil.generateColumnStatsKeys(
+              
Collections.singletonList(HoodieRecord.RECORD_KEY_METADATA_FIELD), 
+              HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)),
+          MetadataPartitionType.COLUMN_STATS.getPartitionPath(), 
true).collectAsList();
       // 1 partition and 2 commits. total entries should be 2.
       assertEquals(result.size(), 2);
       result.forEach(entry -> {
@@ -1504,10 +1504,11 @@ public class TestJavaHoodieBackedMetadata extends 
TestHoodieMetadataBase {
       });
 
       // prefix search for column {commit time} and first partition
-      columnIndexID = new 
ColumnIndexID(HoodieRecord.COMMIT_TIME_METADATA_FIELD);
       result = tableMetadata.getRecordsByKeyPrefixes(
-          
HoodieListData.lazy(Collections.singletonList(columnIndexID.asBase64EncodedString().concat(partitionIndexID.asBase64EncodedString()))),
-          MetadataPartitionType.COLUMN_STATS.getPartitionPath(), true, 
IDENTITY_ENCODING).collectAsList();
+          HoodieListData.lazy(HoodieTableMetadataUtil.generateColumnStatsKeys(
+              
Collections.singletonList(HoodieRecord.COMMIT_TIME_METADATA_FIELD), 
+              HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)),
+          MetadataPartitionType.COLUMN_STATS.getPartitionPath(), 
true).collectAsList();
 
       // 1 partition and 2 commits. total entries should be 2.
       assertEquals(result.size(), 2);
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
index 2f4d561e96da..90e56adbe16c 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
@@ -108,7 +108,6 @@ import static 
org.apache.hudi.metadata.HoodieMetadataPayload.COLUMN_STATS_FIELD_
 import static 
org.apache.hudi.metadata.HoodieMetadataPayload.COLUMN_STATS_FIELD_VALUE_COUNT;
 import static 
org.apache.hudi.metadata.HoodieMetadataPayload.createBloomFilterMetadataRecord;
 import static 
org.apache.hudi.metadata.HoodieMetadataPayload.createColumnStatsRecords;
-import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.IDENTITY_ENCODING;
 import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_BLOOM_FILTERS;
 import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS;
 import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.getFileSystemViewForMetadataTable;
@@ -403,7 +402,8 @@ public class SparkMetadataWriterUtils {
         // Fetch EI column stat records for above files
         List<HoodieColumnRangeMetadata<Comparable>> partitionColumnMetadata =
             tableMetadata.getRecordsByKeyPrefixes(
-                    
HoodieListData.lazy(HoodieTableMetadataUtil.generateKeyPrefixes(validColumnsToIndex,
 partitionName)), indexPartition, false, IDENTITY_ENCODING)
+                    
HoodieListData.lazy(HoodieTableMetadataUtil.generateColumnStatsKeys(validColumnsToIndex,
 partitionName)),
+                    indexPartition, false)
                 // schema and properties are ignored in getInsertValue, so 
simply pass as null
                 .map(record -> record.getData().getInsertValue(null, null))
                 .filter(Option::isPresent)
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkMetadataTableRecordIndex.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkMetadataTableRecordIndex.java
index b37a0506f766..cf9d5c7690e7 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkMetadataTableRecordIndex.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkMetadataTableRecordIndex.java
@@ -171,7 +171,7 @@ public class SparkMetadataTableRecordIndex extends 
HoodieIndex<Object, Object> {
 
       // recordIndexInfo object only contains records that are present in 
record_index.
       HoodiePairData<String, HoodieRecordGlobalLocation> recordIndexData =
-          
hoodieTable.getMetadataTable().readRecordIndex(HoodieListData.eager(keysToLookup));
+          
hoodieTable.getMetadataTable().readRecordIndexLocationsWithKeys(HoodieListData.eager(keysToLookup));
       try {
         Map<String, HoodieRecordGlobalLocation> recordIndexInfo = 
HoodieDataUtils.dedupeAndCollectAsMap(recordIndexData);
         return recordIndexInfo.entrySet().stream()
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/NoOpTableMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/NoOpTableMetadata.java
index 6c7cc33f934f..5908d805d773 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/NoOpTableMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/NoOpTableMetadata.java
@@ -23,7 +23,6 @@ import org.apache.hudi.common.bloom.BloomFilter;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.data.HoodieListPairData;
 import org.apache.hudi.common.data.HoodiePairData;
-import org.apache.hudi.common.function.SerializableFunctionUnchecked;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
 import org.apache.hudi.common.util.Option;
@@ -34,6 +33,7 @@ import org.apache.hudi.internal.schema.Types;
 import org.apache.hudi.metadata.HoodieMetadataPayload;
 import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.metadata.RawKey;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.StoragePathInfo;
 
@@ -105,20 +105,20 @@ class NoOpTableMetadata implements HoodieTableMetadata {
   }
 
   @Override
-  public HoodiePairData<String, HoodieRecordGlobalLocation> 
readRecordIndex(HoodieData<String> recordKeys) {
+  public HoodiePairData<String, HoodieRecordGlobalLocation> 
readRecordIndexLocationsWithKeys(HoodieData<String> recordKeys) {
     throw new HoodieMetadataException("Unsupported operation: 
readRecordIndex!");
   }
 
   @Override
-  public HoodiePairData<String, HoodieRecordGlobalLocation> 
readSecondaryIndex(HoodieData<String> secondaryKeys, String partitionName) {
+  public HoodiePairData<String, HoodieRecordGlobalLocation> 
readSecondaryIndexLocationsWithKeys(HoodieData<String> secondaryKeys, String 
partitionName) {
     return HoodieListPairData.eager(Collections.emptyMap());
   }
 
   @Override
-  public HoodieData<HoodieRecord<HoodieMetadataPayload>> 
getRecordsByKeyPrefixes(HoodieData<String> keyPrefixes,
-                                                                               
  String partitionName,
-                                                                               
  boolean shouldLoadInMemory,
-                                                                               
  SerializableFunctionUnchecked<String, String> keyEncoder) {
+  public HoodieData<HoodieRecord<HoodieMetadataPayload>> 
getRecordsByKeyPrefixes(
+      HoodieData<? extends RawKey> rawKeys,
+      String partitionName,
+      boolean shouldLoadInMemory) {
     throw new HoodieMetadataException("Unsupported operation: 
getRecordsByKeyPrefixes!");
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java 
b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
index d98230b503c8..49178158034f 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
@@ -29,7 +29,6 @@ import org.apache.hudi.common.data.HoodiePairData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.engine.HoodieLocalEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.function.SerializableFunctionUnchecked;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -38,9 +37,6 @@ import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.common.util.hash.ColumnIndexID;
-import org.apache.hudi.common.util.hash.FileIndexID;
-import org.apache.hudi.common.util.hash.PartitionIndexID;
 import org.apache.hudi.config.metrics.HoodieMetricsConfig;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieMetadataException;
@@ -59,15 +55,11 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
-import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.IDENTITY_ENCODING;
-
 /**
  * Abstract class for implementing common table metadata operations.
  */
@@ -200,23 +192,21 @@ public abstract class BaseTableMetadata extends 
AbstractHoodieTableMetadata {
     }
 
     HoodieTimer timer = HoodieTimer.start();
-    Set<String> partitionIDFileIDStrings = new HashSet<>();
     Map<String, Pair<String, String>> fileToKeyMap = new HashMap<>();
+    List<BloomFilterIndexRawKey> bloomFilterKeys = new ArrayList<>();
     partitionNameFileNameList.forEach(partitionNameFileNamePair -> {
-      final String bloomFilterIndexKey = 
HoodieMetadataPayload.getBloomFilterIndexKey(
-          new 
PartitionIndexID(HoodieTableMetadataUtil.getBloomFilterIndexPartitionIdentifier(partitionNameFileNamePair.getLeft())),
 new FileIndexID(partitionNameFileNamePair.getRight()));
-      partitionIDFileIDStrings.add(bloomFilterIndexKey);
-      fileToKeyMap.put(bloomFilterIndexKey, partitionNameFileNamePair);
+      BloomFilterIndexRawKey rawKey = new 
BloomFilterIndexRawKey(partitionNameFileNamePair.getLeft(), 
partitionNameFileNamePair.getRight());
+      bloomFilterKeys.add(rawKey);
+      fileToKeyMap.put(rawKey.encode(), partitionNameFileNamePair);
     });
 
-    List<String> partitionIDFileIDStringsList = new 
ArrayList<>(partitionIDFileIDStrings);
     HoodiePairData<String, HoodieRecord<HoodieMetadataPayload>> recordsData =
-        getRecordsByKeys(HoodieListData.eager(partitionIDFileIDStringsList), 
metadataPartitionName, IDENTITY_ENCODING);
+        readIndexRecordsWithKeys(HoodieListData.eager(bloomFilterKeys), 
metadataPartitionName);
     Map<String, HoodieRecord<HoodieMetadataPayload>> hoodieRecords;
     try {
       hoodieRecords = HoodieDataUtils.dedupeAndCollectAsMap(recordsData);
       metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.LOOKUP_BLOOM_FILTERS_METADATA_STR, 
timer.endTimer()));
-      metrics.ifPresent(m -> 
m.setMetric(HoodieMetadataMetrics.LOOKUP_BLOOM_FILTERS_FILE_COUNT_STR, 
partitionIDFileIDStringsList.size()));
+      metrics.ifPresent(m -> 
m.setMetric(HoodieMetadataMetrics.LOOKUP_BLOOM_FILTERS_FILE_COUNT_STR, 
bloomFilterKeys.size()));
     } finally {
       recordsData.unpersistWithDependencies();
     }
@@ -261,8 +251,8 @@ public abstract class BaseTableMetadata extends 
AbstractHoodieTableMetadata {
       return Collections.emptyMap();
     }
 
-    Map<String, Pair<String, String>> columnStatKeyToFileNameMap = 
computeColStatKeyToFileName(partitionNameFileNameList, columnNames);
-    return computeFileToColumnStatsMap(columnStatKeyToFileNameMap);
+    Pair<List<ColumnStatsIndexRawKey>, Map<String, Pair<String, String>>> 
rawKeysAndMap = computeColStatRawKeys(partitionNameFileNameList, columnNames);
+    return computeFileToColumnStatsMap(rawKeysAndMap.getLeft(), 
rawKeysAndMap.getRight());
   }
 
   /**
@@ -270,7 +260,7 @@ public abstract class BaseTableMetadata extends 
AbstractHoodieTableMetadata {
    */
   protected List<String> fetchAllPartitionPaths() {
     HoodieTimer timer = HoodieTimer.start();
-    Option<HoodieRecord<HoodieMetadataPayload>> recordOpt = 
getRecordByKey(RECORDKEY_PARTITION_LIST,
+    Option<HoodieRecord<HoodieMetadataPayload>> recordOpt = 
readFilesIndexRecords(RECORDKEY_PARTITION_LIST,
         MetadataPartitionType.FILES.getPartitionPath());
     metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.LOOKUP_PARTITIONS_STR, timer.endTimer()));
 
@@ -302,7 +292,7 @@ public abstract class BaseTableMetadata extends 
AbstractHoodieTableMetadata {
     String recordKey = relativePartitionPath.isEmpty() ? NON_PARTITIONED_NAME 
: relativePartitionPath;
 
     HoodieTimer timer = HoodieTimer.start();
-    Option<HoodieRecord<HoodieMetadataPayload>> recordOpt = 
getRecordByKey(recordKey,
+    Option<HoodieRecord<HoodieMetadataPayload>> recordOpt = 
readFilesIndexRecords(recordKey,
         MetadataPartitionType.FILES.getPartitionPath());
     metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.LOOKUP_FILES_STR, timer.endTimer()));
 
@@ -334,9 +324,12 @@ public abstract class BaseTableMetadata extends 
AbstractHoodieTableMetadata {
             );
 
     HoodieTimer timer = HoodieTimer.start();
+    List<FilesIndexRawKey> filesKeys = partitionIdToPathMap.keySet().stream()
+        .map(FilesIndexRawKey::new)
+        .collect(Collectors.toList());
     HoodiePairData<String, HoodieRecord<HoodieMetadataPayload>> recordsData =
-        getRecordsByKeys(HoodieListData.eager(new 
ArrayList<>(partitionIdToPathMap.keySet())),
-            MetadataPartitionType.FILES.getPartitionPath(), IDENTITY_ENCODING);
+        readIndexRecordsWithKeys(HoodieListData.eager(filesKeys),
+            MetadataPartitionType.FILES.getPartitionPath());
     Map<String, HoodieRecord<HoodieMetadataPayload>> partitionIdRecordPairs;
     try {
       partitionIdRecordPairs = 
HoodieDataUtils.dedupeAndCollectAsMap(recordsData);
@@ -366,39 +359,43 @@ public abstract class BaseTableMetadata extends 
AbstractHoodieTableMetadata {
   }
 
   /**
-   * Computes a map from col-stats key to partition and file name pair.
+   * Computes raw keys and metadata for column stats lookup.
    *
    * @param partitionNameFileNameList - List of partition and file name pair 
for which bloom filters need to be retrieved.
    * @param columnNames - List of column name for which stats are needed.
+   * @return Pair of raw keys list and a map from encoded key to 
partition/file pair
    */
-  private Map<String, Pair<String, String>> computeColStatKeyToFileName(
+  private Pair<List<ColumnStatsIndexRawKey>, Map<String, Pair<String, 
String>>> computeColStatRawKeys(
       final List<Pair<String, String>> partitionNameFileNameList,
       final List<String> columnNames) {
+    List<ColumnStatsIndexRawKey> rawKeys = new ArrayList<>();
     Map<String, Pair<String, String>> columnStatKeyToFileNameMap = new 
HashMap<>();
+    
     for (String columnName : columnNames) {
-      final ColumnIndexID columnIndexID = new ColumnIndexID(columnName);
       for (Pair<String, String> partitionNameFileNamePair : 
partitionNameFileNameList) {
-        final String columnStatsIndexKey = 
HoodieMetadataPayload.getColumnStatsIndexKey(
-            new 
PartitionIndexID(HoodieTableMetadataUtil.getColumnStatsIndexPartitionIdentifier(partitionNameFileNamePair.getLeft())),
-            new FileIndexID(partitionNameFileNamePair.getRight()),
-            columnIndexID);
-        columnStatKeyToFileNameMap.put(columnStatsIndexKey, 
partitionNameFileNamePair);
+        ColumnStatsIndexRawKey rawKey = new ColumnStatsIndexRawKey(
+            partitionNameFileNamePair.getLeft(),
+            partitionNameFileNamePair.getRight(),
+            columnName);
+        rawKeys.add(rawKey);
+        columnStatKeyToFileNameMap.put(rawKey.encode(), 
partitionNameFileNamePair);
       }
     }
-    return columnStatKeyToFileNameMap;
+    return Pair.of(rawKeys, columnStatKeyToFileNameMap);
   }
 
   /**
    * Computes the map from partition and file name pair to 
HoodieMetadataColumnStats record.
    *
+   * @param rawKeys - List of raw keys for column stats
    * @param columnStatKeyToFileNameMap - A map from col-stats key to partition 
and file name pair.
    */
-  private Map<Pair<String, String>, List<HoodieMetadataColumnStats>> 
computeFileToColumnStatsMap(Map<String, Pair<String, String>> 
columnStatKeyToFileNameMap) {
-    List<String> columnStatKeylist = new 
ArrayList<>(columnStatKeyToFileNameMap.keySet());
+  private Map<Pair<String, String>, List<HoodieMetadataColumnStats>> 
computeFileToColumnStatsMap(
+      List<ColumnStatsIndexRawKey> rawKeys, Map<String, Pair<String, String>> 
columnStatKeyToFileNameMap) {
     HoodieTimer timer = HoodieTimer.start();
     HoodiePairData<String, HoodieRecord<HoodieMetadataPayload>> recordsData =
-        getRecordsByKeys(
-            HoodieListData.eager(columnStatKeylist), 
MetadataPartitionType.COLUMN_STATS.getPartitionPath(), IDENTITY_ENCODING);
+        readIndexRecordsWithKeys(
+            HoodieListData.eager(rawKeys), 
MetadataPartitionType.COLUMN_STATS.getPartitionPath());
     Map<String, HoodieRecord<HoodieMetadataPayload>> hoodieRecords;
     try {
       hoodieRecords = HoodieDataUtils.dedupeAndCollectAsMap(recordsData);
@@ -443,17 +440,17 @@ public abstract class BaseTableMetadata extends 
AbstractHoodieTableMetadata {
    * @param partitionName The partition name where the record is stored
    * @return Option containing the record if found, empty Option if not found
    */
-  protected abstract Option<HoodieRecord<HoodieMetadataPayload>> 
getRecordByKey(String key, String partitionName);
+  protected abstract Option<HoodieRecord<HoodieMetadataPayload>> 
readFilesIndexRecords(String key, String partitionName);
 
   /**
    * Retrieves a collection of pairs (key -> record) from the metadata table 
by its keys.
    *
-   * @param keys The to look up in the metadata table
+   * @param rawKeys The raw keys to look up in the metadata table
    * @param partitionName The partition name where the records are stored
    * @return A collection of pairs (key -> record)
    */
-  public abstract HoodiePairData<String, HoodieRecord<HoodieMetadataPayload>> 
getRecordsByKeys(
-          HoodieData<String> keys, String partitionName, 
SerializableFunctionUnchecked<String, String> keyEncodingFn);
+  public abstract HoodiePairData<String, HoodieRecord<HoodieMetadataPayload>> 
readIndexRecordsWithKeys(
+          HoodieData<? extends RawKey> rawKeys, String partitionName);
 
   /**
    * Returns a collection of pairs (secondary-key -> record-keys) for the 
provided secondary keys.
@@ -463,7 +460,7 @@ public abstract class BaseTableMetadata extends 
AbstractHoodieTableMetadata {
    * @return A collection of pairs where each key is a secondary key and the 
value is record key that are indexed by that secondary key.
    * If a secondary key value is mapped to different record keys, they are 
tracked as multiple pairs for each of them.
    */
-  public abstract HoodiePairData<String, String> 
getSecondaryIndexRecords(HoodieData<String> keys, String partitionName);
+  public abstract HoodiePairData<String, String> 
readSecondaryIndexDataTableRecordKeysWithKeys(HoodieData<String> keys, String 
partitionName);
 
   public HoodieMetadataConfig getMetadataConfig() {
     return metadataConfig;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/BloomFilterIndexRawKey.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/BloomFilterIndexRawKey.java
new file mode 100644
index 000000000000..911a3d78380d
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/BloomFilterIndexRawKey.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.common.util.hash.FileIndexID;
+import org.apache.hudi.common.util.hash.PartitionIndexID;
+
+import java.util.Objects;
+
+/**
+ * Represents a raw key for bloom filter metadata.
+ */
+public class BloomFilterIndexRawKey implements RawKey {
+  private final String partitionName;
+  private final String fileName;
+
+  public BloomFilterIndexRawKey(String partitionName, String fileName) {
+    this.partitionName = Objects.requireNonNull(partitionName);
+    this.fileName = Objects.requireNonNull(fileName);
+  }
+
+  @Override
+  public String encode() {
+    return HoodieMetadataPayload.getBloomFilterIndexKey(
+        new 
PartitionIndexID(HoodieTableMetadataUtil.getBloomFilterIndexPartitionIdentifier(partitionName)),
+        new FileIndexID(fileName));
+  }
+
+  public String getPartitionName() {
+    return partitionName;
+  }
+
+  public String getFileName() {
+    return fileName;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    BloomFilterIndexRawKey that = (BloomFilterIndexRawKey) o;
+    return Objects.equals(partitionName, that.partitionName) && 
Objects.equals(fileName, that.fileName);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(partitionName, fileName);
+  }
+
+  @Override
+  public String toString() {
+    return "BloomFilterRawKey{" + "partitionName='" + partitionName + '\'' + 
", fileName='" + fileName + '\'' + '}';
+  }
+}
\ No newline at end of file
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/ColumnStatsIndexPrefixRawKey.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/ColumnStatsIndexPrefixRawKey.java
new file mode 100644
index 000000000000..cb20d6201953
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/ColumnStatsIndexPrefixRawKey.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.hash.ColumnIndexID;
+import org.apache.hudi.common.util.hash.PartitionIndexID;
+
+import java.util.Objects;
+
+/**
+ * Represents a raw key for column stats index consisting of column name and 
optional partition name.
+ */
+public class ColumnStatsIndexPrefixRawKey implements RawKey {
+  private static final long serialVersionUID = 1L;
+  
+  private final String columnName;
+  private final Option<String> partitionName;
+  
+  public ColumnStatsIndexPrefixRawKey(String columnName) {
+    this(columnName, Option.empty());
+  }
+  
+  public ColumnStatsIndexPrefixRawKey(String columnName, String partitionName) 
{
+    this(columnName, Option.of(partitionName));
+  }
+  
+  public ColumnStatsIndexPrefixRawKey(String columnName, Option<String> 
partitionName) {
+    this.columnName = Objects.requireNonNull(columnName, "Column name cannot 
be null");
+    this.partitionName = Objects.requireNonNull(partitionName, "Partition name 
option cannot be null");
+  }
+  
+  public String getColumnName() {
+    return columnName;
+  }
+  
+  public Option<String> getPartitionName() {
+    return partitionName;
+  }
+  
+  @Override
+  public String encode() {
+    ColumnIndexID columnIndexID = new ColumnIndexID(columnName);
+    String encodedValue;
+    
+    if (partitionName.isPresent()) {
+      PartitionIndexID partitionIndexId = new PartitionIndexID(
+          
HoodieTableMetadataUtil.getColumnStatsIndexPartitionIdentifier(partitionName.get()));
+      encodedValue = columnIndexID.asBase64EncodedString()
+          .concat(partitionIndexId.asBase64EncodedString());
+    } else {
+      encodedValue = columnIndexID.asBase64EncodedString();
+    }
+    
+    return encodedValue;
+  }
+  
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    ColumnStatsIndexPrefixRawKey that = (ColumnStatsIndexPrefixRawKey) o;
+    return Objects.equals(columnName, that.columnName) && 
Objects.equals(partitionName, that.partitionName);
+  }
+  
+  @Override
+  public int hashCode() {
+    return Objects.hash(columnName, partitionName);
+  }
+  
+  @Override
+  public String toString() {
+    return "ColumnStatsIndexKey{columnName='" + columnName + "', 
partitionName=" + partitionName + "}";
+  }
+}
\ No newline at end of file
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/ColumnStatsIndexRawKey.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/ColumnStatsIndexRawKey.java
new file mode 100644
index 000000000000..3cab656a8275
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/ColumnStatsIndexRawKey.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.common.util.hash.ColumnIndexID;
+import org.apache.hudi.common.util.hash.FileIndexID;
+import org.apache.hudi.common.util.hash.PartitionIndexID;
+
+import java.util.Objects;
+
+/**
+ * Represents a raw key for column stats indexed by partition, file and column.
+ * This is different from ColumnStatsIndexPrefixRawKey which is used for 
prefix lookups.
+ */
+public class ColumnStatsIndexRawKey implements RawKey {
+  private final String partitionName;
+  private final String fileName;
+  private final String columnName;
+
+  public ColumnStatsIndexRawKey(String partitionName, String fileName, String 
columnName) {
+    this.partitionName = Objects.requireNonNull(partitionName);
+    this.fileName = Objects.requireNonNull(fileName);
+    this.columnName = Objects.requireNonNull(columnName);
+  }
+
+  @Override
+  public String encode() {
+    return HoodieMetadataPayload.getColumnStatsIndexKey(
+        new 
PartitionIndexID(HoodieTableMetadataUtil.getColumnStatsIndexPartitionIdentifier(partitionName)),
+        new FileIndexID(fileName),
+        new ColumnIndexID(columnName));
+  }
+
+  public String getPartitionName() {
+    return partitionName;
+  }
+
+  public String getFileName() {
+    return fileName;
+  }
+
+  public String getColumnName() {
+    return columnName;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    ColumnStatsIndexRawKey that = (ColumnStatsIndexRawKey) o;
+    return Objects.equals(partitionName, that.partitionName)
+        && Objects.equals(fileName, that.fileName)
+        && Objects.equals(columnName, that.columnName);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(partitionName, fileName, columnName);
+  }
+
+  @Override
+  public String toString() {
+    return "ColumnStatsFileRawKey{" + "partitionName='" + partitionName + '\''
+        + ", fileName='" + fileName + '\'' + ", columnName='" + columnName + 
'\'' + '}';
+  }
+}
\ No newline at end of file
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
index 6b01180696af..2645ef397b93 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
@@ -24,7 +24,6 @@ import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.data.HoodiePairData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.function.SerializableFunctionUnchecked;
 import org.apache.hudi.common.model.HoodiePartitionMetadata;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
@@ -311,20 +310,20 @@ public class FileSystemBackedTableMetadata extends 
AbstractHoodieTableMetadata {
   }
 
   @Override
-  public HoodieData<HoodieRecord<HoodieMetadataPayload>> 
getRecordsByKeyPrefixes(HoodieData<String> keyPrefixes,
-                                                                               
  String partitionName,
-                                                                               
  boolean shouldLoadInMemory,
-                                                                               
  SerializableFunctionUnchecked<String, String> keyEncoder) {
+  public HoodieData<HoodieRecord<HoodieMetadataPayload>> 
getRecordsByKeyPrefixes(
+      HoodieData<? extends RawKey> rawKeys,
+      String partitionName,
+      boolean shouldLoadInMemory) {
     throw new HoodieMetadataException("Unsupported operation: 
getRecordsByKeyPrefixes!");
   }
 
   @Override
-  public HoodiePairData<String, HoodieRecordGlobalLocation> 
readRecordIndex(HoodieData<String> recordKeys) {
+  public HoodiePairData<String, HoodieRecordGlobalLocation> 
readRecordIndexLocationsWithKeys(HoodieData<String> recordKeys) {
     throw new HoodieMetadataException("Unsupported operation: 
readRecordIndex!");
   }
 
   @Override
-  public HoodiePairData<String, HoodieRecordGlobalLocation> 
readSecondaryIndex(HoodieData<String> secondaryKeys, String partitionName) {
+  public HoodiePairData<String, HoodieRecordGlobalLocation> 
readSecondaryIndexLocationsWithKeys(HoodieData<String> secondaryKeys, String 
partitionName) {
     throw new HoodieMetadataException("Unsupported operation: 
readSecondaryIndex!");
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/FilesIndexRawKey.java 
b/hudi-common/src/main/java/org/apache/hudi/metadata/FilesIndexRawKey.java
new file mode 100644
index 000000000000..78fb9a70536a
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/FilesIndexRawKey.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import java.util.Objects;
+
+/**
+ * Represents a raw key for the FILES partition in the metadata table.
+ * This uses identity encoding - the key is used as-is without transformation.
+ */
+public class FilesIndexRawKey implements RawKey {
+  private final String key;
+
+  public FilesIndexRawKey(String key) {
+    this.key = Objects.requireNonNull(key);
+  }
+
+  @Override
+  public String encode() {
+    // Identity encoding - return the key as-is
+    return key;
+  }
+
+  public String getKey() {
+    return key;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    FilesIndexRawKey that = (FilesIndexRawKey) o;
+    return Objects.equals(key, that.key);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(key);
+  }
+
+  @Override
+  public String toString() {
+    return "FilesPartitionRawKey{" + "key='" + key + '\'' + '}';
+  }
+}
\ No newline at end of file
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index 77af1e409d94..c196a6912096 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -53,6 +53,7 @@ import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.common.util.collection.ClosableSortedDedupingIterator;
+import org.apache.hudi.common.util.collection.CloseableFilterIterator;
 import org.apache.hudi.common.util.collection.CloseableMappingIterator;
 import org.apache.hudi.common.util.collection.EmptyIterator;
 import org.apache.hudi.common.util.collection.ImmutablePair;
@@ -99,8 +100,6 @@ import static 
org.apache.hudi.common.config.HoodieMemoryConfig.SPILLABLE_MAP_BAS
 import static 
org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FULL_SCAN_LOG_FILES;
 import static org.apache.hudi.common.util.ValidationUtils.checkState;
 import static org.apache.hudi.metadata.HoodieMetadataPayload.KEY_FIELD_NAME;
-import static 
org.apache.hudi.metadata.HoodieMetadataPayload.SECONDARY_INDEX_RECORD_KEY_SEPARATOR;
-import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.IDENTITY_ENCODING;
 import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_BLOOM_FILTERS;
 import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS;
 import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_FILES;
@@ -174,9 +173,9 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
   }
 
   @Override
-  protected Option<HoodieRecord<HoodieMetadataPayload>> getRecordByKey(String 
key, String partitionName) {
-    HoodiePairData<String, HoodieRecord<HoodieMetadataPayload>> recordsData = 
getRecordsByKeys(
-        HoodieListData.eager(Collections.singletonList(key)), partitionName, 
IDENTITY_ENCODING);
+  protected Option<HoodieRecord<HoodieMetadataPayload>> 
readFilesIndexRecords(String key, String partitionName) {
+    HoodiePairData<String, HoodieRecord<HoodieMetadataPayload>> recordsData = 
readIndexRecordsWithKeys(
+        HoodieListData.eager(Collections.singletonList(new 
FilesIndexRawKey(key))), partitionName);
     try {
       List<HoodieRecord<HoodieMetadataPayload>> records = 
recordsData.values().collectAsList();
       ValidationUtils.checkArgument(records.size() <= 1, () -> "Found more 
than 1 record for record key " + key);
@@ -222,12 +221,12 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
   }
 
   @Override
-  public HoodieData<HoodieRecord<HoodieMetadataPayload>> 
getRecordsByKeyPrefixes(HoodieData<String> keyPrefixes,
-                                                                               
  String partitionName,
-                                                                               
  boolean shouldLoadInMemory,
-                                                                               
  SerializableFunctionUnchecked<String, String> keyEncodingFn) {
+  public HoodieData<HoodieRecord<HoodieMetadataPayload>> 
getRecordsByKeyPrefixes(
+      HoodieData<? extends RawKey> rawKeys,
+      String partitionName,
+      boolean shouldLoadInMemory) {
     // Apply key encoding
-    List<String> sortedKeyPrefixes = new 
ArrayList<>(keyPrefixes.map(keyEncodingFn::apply).collectAsList());
+    List<String> sortedKeyPrefixes = new ArrayList<>(rawKeys.map(key -> 
key.encode()).collectAsList());
     // Sort the prefixes so that keys are looked up in order
     // Sort must come after encoding.
     Collections.sort(sortedKeyPrefixes);
@@ -243,13 +242,14 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
         getEngineContext().parallelize(partitionFileSlices))
         .flatMap(
             (SerializableFunction<FileSlice, 
Iterator<HoodieRecord<HoodieMetadataPayload>>>) fileSlice ->
-                lookupRecords(partitionName, sortedKeyPrefixes, fileSlice,
+                readSliceAndFilterByKeysIntoList(partitionName, 
sortedKeyPrefixes, fileSlice,
                     metadataRecord -> {
                       HoodieMetadataPayload payload = new 
HoodieMetadataPayload(Option.of(metadataRecord));
                       String rowKey = payload.key != null ? payload.key : 
metadataRecord.get(KEY_FIELD_NAME).toString();
                       HoodieKey key = new HoodieKey(rowKey, partitionName);
                       return new HoodieAvroRecord<>(key, payload);
-                    }, false));
+                    }, false))
+        .filter(r -> !r.getData().isDeleted());
   }
 
   /**
@@ -259,88 +259,24 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
    * 3. [lookup within file groups] lookup the key in the file group
    * 4. the record is returned
    */
-  /**
-   * Performs lookup of records in the metadata table.
-   *
-   * @param keys               The keys to look up in the metadata table
-   * @param partitionName      The name of the metadata partition to search in
-   * @param fileSlices         The list of file slices to search through
-   * @param keyEncodingFn      Optional function to encode keys before lookup
-   * @return Pair data containing the looked up records keyed by their 
original keys
-   */
-  private HoodiePairData<String, HoodieRecord<HoodieMetadataPayload>> 
doLookup(HoodieData<String> keys, String partitionName, List<FileSlice> 
fileSlices,
-                                                                               
SerializableFunctionUnchecked<String, String> keyEncodingFn) {
-    final int numFileSlices = fileSlices.size();
-    if (numFileSlices == 1) {
-      List<String> keysList = keys.map(keyEncodingFn::apply).collectAsList();
-      TreeSet<String> distinctSortedKeys = new TreeSet<>(keysList);
-      return lookupKeyRecordPairs(partitionName, new 
ArrayList<>(distinctSortedKeys), fileSlices.get(0));
-    }
-
-    // For SI v2, there are 2 cases require different implementation:
-    // SI write path concatenates secKey$recordKey, the secKey needs extracted 
for hashing;
-    // SI read path gives secKey only, no need for secKey extraction.
-    SerializableBiFunction<String, Integer, Integer> mappingFunction = 
HoodieTableMetadataUtil::mapRecordKeyToFileGroupIndex;
-    keys = repartitioningIfNeeded(keys, partitionName, numFileSlices, 
mappingFunction, keyEncodingFn);
-    HoodiePairData<Integer, String> persistedInitialPairData = keys
-        // Tag key with file group index
-        .mapToPair(recordKey -> {
-          String encodedKey = keyEncodingFn.apply(recordKey);
-          // Always encode the key before apply mapping.
-          return new ImmutablePair<>(mappingFunction.apply(encodedKey, 
numFileSlices), encodedKey);
-        });
-    persistedInitialPairData.persist("MEMORY_AND_DISK_SER");
-    dataCleanupManager.trackPersistedData(persistedInitialPairData);
-    // Use the new processValuesOfTheSameShards API instead of explicit 
rangeBasedRepartitionForEachKey
-    SerializableFunction<Iterator<String>, Iterator<Pair<String, 
HoodieRecord<HoodieMetadataPayload>>>> processFunction =
-        sortedKeys -> {
-          List<String> keysList = new ArrayList<>();
-          // Decorate with sorted stream deduplication.
-          try (ClosableSortedDedupingIterator<String> distinctSortedKeyIter = 
new ClosableSortedDedupingIterator<>(sortedKeys)) {
-            if (!distinctSortedKeyIter.hasNext()) {
-              return Collections.emptyIterator();
-            }
-            distinctSortedKeyIter.forEachRemaining(keysList::add);
-          }
-          FileSlice fileSlice = 
fileSlices.get(mappingFunction.apply(keysList.get(0), numFileSlices));
-          return lookupKeyRecordPairsItr(partitionName, keysList, fileSlice);
-        };
-
-    List<Integer> keySpace = IntStream.range(0, 
numFileSlices).boxed().collect(Collectors.toList());
-    HoodiePairData<String, HoodieRecord<HoodieMetadataPayload>> result =
-        getEngineContext().mapGroupsByKey(persistedInitialPairData, 
processFunction, keySpace, true)
-            .mapToPair(p -> Pair.of(p.getLeft(), p.getRight()));
-    return result.filter((String k, HoodieRecord<HoodieMetadataPayload> v) -> 
!v.getData().isDeleted());
-  }
-
-  /**
-   * All keys to be looked up go through the following steps:
-   * 1. [encode] escape/encode the key if needed
-   * 2. [hash to file group] compute the hash of the key to
-   * 3. [lookup within file groups] lookup the key in the file group
-   * 4. the record is returned
-   */
-  private HoodieData<HoodieRecord<HoodieMetadataPayload>> 
doLookupIndexRecords(HoodieData<String> keys, String partitionName, 
List<FileSlice> fileSlices,
-                                                                               
SerializableFunctionUnchecked<String, String> keyEncodingFn) {
+  private HoodieData<HoodieRecord<HoodieMetadataPayload>> 
lookupIndexRecords(HoodieData<String> keys, String partitionName, 
List<FileSlice> fileSlices) {
     boolean isSecondaryIndex = 
MetadataPartitionType.fromPartitionPath(partitionName).equals(MetadataPartitionType.SECONDARY_INDEX);
 
     final int numFileSlices = fileSlices.size();
     if (numFileSlices == 1) {
-      List<String> keysList = keys.map(keyEncodingFn::apply).collectAsList();
+      List<String> keysList = keys.collectAsList();
       TreeSet<String> distinctSortedKeys = new TreeSet<>(keysList);
-      return lookupRecords(partitionName, new ArrayList<>(distinctSortedKeys), 
fileSlices.get(0), !isSecondaryIndex);
+      return readSliceAndFilterByKeysIntoList(partitionName, new 
ArrayList<>(distinctSortedKeys), fileSlices.get(0), !isSecondaryIndex);
     }
 
     // For SI v2, there are 2 cases require different implementation:
     // SI write path concatenates secKey$recordKey, the secKey needs extracted 
for hashing;
     // SI read path gives secKey only, no need for secKey extraction.
     SerializableBiFunction<String, Integer, Integer> mappingFunction = 
HoodieTableMetadataUtil::mapRecordKeyToFileGroupIndex;
-    keys = repartitioningIfNeeded(keys, partitionName, numFileSlices, 
mappingFunction, keyEncodingFn);
+    keys = repartitioningIfNeeded(keys, partitionName, numFileSlices, 
mappingFunction);
     HoodiePairData<Integer, String> persistedInitialPairData = keys
         // Tag key with file group index
-        .mapToPair(recordKey -> {
-          String encodedKey = keyEncodingFn.apply(recordKey);
-          // Always encode the key before apply mapping.
+        .mapToPair(encodedKey -> {
           return new ImmutablePair<>(mappingFunction.apply(encodedKey, 
numFileSlices), encodedKey);
         });
     persistedInitialPairData.persist("MEMORY_AND_DISK_SER");
@@ -361,10 +297,7 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
           return lookupRecordsItr(partitionName, keysList, fileSlice, 
!isSecondaryIndex);
         };
     List<Integer> keySpace = IntStream.range(0, 
numFileSlices).boxed().collect(Collectors.toList());
-    HoodieData<HoodieRecord<HoodieMetadataPayload>> result =
-        getEngineContext().mapGroupsByKey(persistedInitialPairData, 
processFunction, keySpace, true);
-
-    return result.filter((HoodieRecord<HoodieMetadataPayload> v) -> 
!v.getData().isDeleted());
+    return getEngineContext().mapGroupsByKey(persistedInitialPairData, 
processFunction, keySpace, true);
   }
 
   /**
@@ -375,7 +308,7 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
    * @param recordKeys List of mapping from keys to the record location.
    */
   @Override
-  public HoodiePairData<String, HoodieRecordGlobalLocation> 
readRecordIndex(HoodieData<String> recordKeys) {
+  public HoodiePairData<String, HoodieRecordGlobalLocation> 
readRecordIndexLocationsWithKeys(HoodieData<String> recordKeys) {
     // If record index is not initialized yet, we cannot return an empty 
result here unlike the code for reading from other
     // indexes. This is because results from this function are used for 
upserts and returning an empty result here would lead
     // to existing records being inserted again causing duplicates.
@@ -385,7 +318,8 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
 
     return dataCleanupManager.ensureDataCleanupOnException(v -> {
       // TODO [HUDI-9544]: Metric does not work for rdd based API due to lazy 
evaluation.
-      return getRecordsByKeys(recordKeys, 
MetadataPartitionType.RECORD_INDEX.getPartitionPath(), IDENTITY_ENCODING)
+      HoodieData<RecordIndexRawKey> rawKeys = 
recordKeys.map(RecordIndexRawKey::new);
+      return readIndexRecordsWithKeys(rawKeys, 
MetadataPartitionType.RECORD_INDEX.getPartitionPath())
           .mapToPair((Pair<String, HoodieRecord<HoodieMetadataPayload>> p) -> 
Pair.of(p.getLeft(), p.getRight().getData().getRecordGlobalLocation()));
     });
   }
@@ -406,10 +340,11 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
     
ValidationUtils.checkState(dataMetaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.RECORD_INDEX),
         "Record index is not initialized in MDT");
     
-    return dataCleanupManager.ensureDataCleanupOnException(v ->
-        readIndexRecords(recordKeys, 
MetadataPartitionType.RECORD_INDEX.getPartitionPath(), IDENTITY_ENCODING)
-            .map(r -> r.getData().getRecordGlobalLocation())
-    );
+    return dataCleanupManager.ensureDataCleanupOnException(v -> {
+      HoodieData<RecordIndexRawKey> rawKeys = 
recordKeys.map(RecordIndexRawKey::new);
+      return readIndexRecords(rawKeys, 
MetadataPartitionType.RECORD_INDEX.getPartitionPath())
+            .map(r -> r.getData().getRecordGlobalLocation());
+    });
   }
 
   /**
@@ -420,14 +355,14 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
    * @param secondaryKeys The list of secondary keys to read
    */
   @Override
-  public HoodiePairData<String, HoodieRecordGlobalLocation> 
readSecondaryIndex(HoodieData<String> secondaryKeys, String partitionName) {
+  public HoodiePairData<String, HoodieRecordGlobalLocation> 
readSecondaryIndexLocationsWithKeys(HoodieData<String> secondaryKeys, String 
partitionName) {
     HoodieIndexVersion indexVersion = 
existingIndexVersionOrDefault(partitionName, dataMetaClient);
 
     return dataCleanupManager.ensureDataCleanupOnException(v -> {
       if (indexVersion.equals(HoodieIndexVersion.V1)) {
-        return readSecondaryIndexV1(secondaryKeys, partitionName);
+        return readSecondaryIndexLocationsWithKeysV1(secondaryKeys, 
partitionName);
       } else if (indexVersion.equals(HoodieIndexVersion.V2)) {
-        return readSecondaryIndexV2(secondaryKeys, partitionName);
+        return readSecondaryIndexLocationsWithKeysV2(secondaryKeys, 
partitionName);
       } else {
         throw new IllegalArgumentException("readSecondaryIndex does not 
support index with version " + indexVersion);
       }
@@ -447,9 +382,9 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
 
     return dataCleanupManager.ensureDataCleanupOnException(v -> {
       if (indexVersion.equals(HoodieIndexVersion.V1)) {
-        return readSecondaryIndexV1(secondaryKeys, partitionName).values();
+        return readSecondaryIndexLocationsWithKeysV1(secondaryKeys, 
partitionName).values();
       } else if (indexVersion.equals(HoodieIndexVersion.V2)) {
-        return 
readRecordIndexLocations(getRecordKeysFromSecondaryKeysV2(secondaryKeys, 
partitionName));
+        return 
readRecordIndexLocations(readSecondaryIndexDataTableRecordKeysV2(secondaryKeys, 
partitionName));
       } else {
         throw new IllegalArgumentException("readSecondaryIndexResult does not 
support index with version " + indexVersion);
       }
@@ -457,27 +392,25 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
   }
 
   @Override
-  public HoodiePairData<String, HoodieRecord<HoodieMetadataPayload>> 
getRecordsByKeys(
-      HoodieData<String> keys, String partitionName, 
SerializableFunctionUnchecked<String, String> keyEncodingFn) {
-    List<FileSlice> fileSlices = 
partitionFileSliceMap.computeIfAbsent(partitionName,
-        k -> 
HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient, 
getMetadataFileSystemView(), partitionName));
-    checkState(!fileSlices.isEmpty(), () -> "No file slices found for 
partition: " + partitionName);
-
-    return doLookup(keys, partitionName, fileSlices, keyEncodingFn);
+  public HoodiePairData<String, HoodieRecord<HoodieMetadataPayload>> 
readIndexRecordsWithKeys(
+      HoodieData<? extends RawKey> rawKeys, String partitionName) {
+    return readIndexRecords(rawKeys, partitionName)
+        .mapToPair(record -> Pair.of(record.getRecordKey(), record));
   }
 
-  public HoodieData<String> 
getRecordKeysFromSecondaryKeysV2(HoodieData<String> secondaryKeys, String 
partitionName) {
-    return dataCleanupManager.ensureDataCleanupOnException(v ->
-        readIndexRecords(secondaryKeys, partitionName, 
SecondaryIndexKeyUtils::escapeSpecialChars).map(
-            hoodieRecord -> 
SecondaryIndexKeyUtils.getRecordKeyFromSecondaryIndexKey(hoodieRecord.getRecordKey()))
-    );
+  public HoodieData<String> 
readSecondaryIndexDataTableRecordKeysV2(HoodieData<String> secondaryKeys, 
String partitionName) {
+    return dataCleanupManager.ensureDataCleanupOnException(v -> {
+      HoodieData<SecondaryIndexPrefixRawKey> rawKeys = 
secondaryKeys.map(SecondaryIndexPrefixRawKey::new);
+      return readIndexRecords(rawKeys, partitionName)
+            .map(hoodieRecord -> 
SecondaryIndexKeyUtils.getRecordKeyFromSecondaryIndexKey(hoodieRecord.getRecordKey()));
+    });
   }
 
-  private HoodiePairData<String, HoodieRecordGlobalLocation> 
readSecondaryIndexV2(HoodieData<String> secondaryKeys, String partitionName) {
-    return readRecordIndex(getRecordKeysFromSecondaryKeysV2(secondaryKeys, 
partitionName));
+  private HoodiePairData<String, HoodieRecordGlobalLocation> 
readSecondaryIndexLocationsWithKeysV2(HoodieData<String> secondaryKeys, String 
partitionName) {
+    return 
readRecordIndexLocationsWithKeys(readSecondaryIndexDataTableRecordKeysV2(secondaryKeys,
 partitionName));
   }
 
-  private HoodiePairData<String, HoodieRecordGlobalLocation> 
readSecondaryIndexV1(HoodieData<String> secondaryKeys, String partitionName) {
+  private HoodiePairData<String, HoodieRecordGlobalLocation> 
readSecondaryIndexLocationsWithKeysV1(HoodieData<String> secondaryKeys, String 
partitionName) {
     // For secondary index v1 we keep the old implementation.
     ValidationUtils.checkState(secondaryKeys instanceof HoodieListData, 
"readSecondaryIndex only support HoodieListData at the moment");
     
ValidationUtils.checkState(dataMetaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.RECORD_INDEX),
@@ -487,34 +420,34 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
         () -> "Secondary index is not initialized in MDT for: " + 
partitionName);
     // Fetch secondary-index records
     Map<String, Set<String>> secondaryKeyRecords = 
HoodieDataUtils.collectPairDataAsMap(
-        
getSecondaryIndexRecords(HoodieListData.eager(secondaryKeys.collectAsList()), 
partitionName));
+        
readSecondaryIndexDataTableRecordKeysWithKeys(HoodieListData.eager(secondaryKeys.collectAsList()),
 partitionName));
     // Now collect the record-keys and fetch the RLI records
     List<String> recordKeys = new ArrayList<>();
     secondaryKeyRecords.values().forEach(recordKeys::addAll);
-    return readRecordIndex(HoodieListData.eager(recordKeys));
+    return readRecordIndexLocationsWithKeys(HoodieListData.eager(recordKeys));
   }
 
-  protected HoodieData<HoodieRecord<HoodieMetadataPayload>> 
readIndexRecords(HoodieData<String> keys,
-                                                                             
String partitionName,
-                                                                             
SerializableFunctionUnchecked<String, String> keyEncodingFn) {
+  protected HoodieData<HoodieRecord<HoodieMetadataPayload>> 
readIndexRecords(HoodieData<? extends RawKey> rawKeys,
+                                                                             
String partitionName) {
     List<FileSlice> fileSlices = 
partitionFileSliceMap.computeIfAbsent(partitionName,
         k -> 
HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient, 
getMetadataFileSystemView(), partitionName));
     checkState(!fileSlices.isEmpty(), "No file slices found for partition: " + 
partitionName);
-    return doLookupIndexRecords(keys, partitionName, fileSlices, 
keyEncodingFn);
+    
+    // Convert RawKey to String using encode()
+    HoodieData<String> keys = rawKeys.map(key -> key.encode());
+    return lookupIndexRecords(keys, partitionName, fileSlices);
   }
 
   // When testing we noticed that the parallelism can be very low which hurts 
the performance. so we should start with a reasonable
   // level of parallelism in that case.
   private HoodieData<String> repartitioningIfNeeded(
-      HoodieData<String> keys, String partitionName, int numFileSlices, 
SerializableBiFunction<String, Integer, Integer> mappingFunction,
-      SerializableFunctionUnchecked<String, String> keyEncodingFn) {
+      HoodieData<String> keys, String partitionName, int numFileSlices, 
SerializableBiFunction<String, Integer, Integer> mappingFunction) {
     if (keys instanceof HoodieListData) {
       int parallelism;
-      parallelism = (int) keys.map(k -> 
mappingFunction.apply(keyEncodingFn.apply(k), 
numFileSlices)).distinct().count();
+      parallelism = (int) keys.map(k -> mappingFunction.apply(k, 
numFileSlices)).distinct().count();
       // In case of empty lookup set, we should avoid RDD with 0 partitions.
       parallelism = Math.max(parallelism, 1);
-      LOG.info("Repartitioning keys for partition {} from list data with 
parallelism: {}",
-          partitionName, parallelism);
+      LOG.info("Repartitioning keys for partition {} from list data with 
parallelism: {}", partitionName, parallelism);
       keys = getEngineContext().parallelize(keys.collectAsList(), parallelism);
     } else if (keys.getNumPartitions() < 
metadataConfig.getRepartitionMinPartitionsThreshold()) {
       LOG.info("Repartitioning keys for partition {} to {} partitions", 
partitionName, metadataConfig.getRepartitionDefaultPartitions());
@@ -599,21 +532,10 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
     return FileGroupRecordBufferLoader.createReusable(readerContext);
   }
 
-  private HoodiePairData<String, HoodieRecord<HoodieMetadataPayload>> 
lookupKeyRecordPairs(String partitionName,
+  private HoodieData<HoodieRecord<HoodieMetadataPayload>> 
readSliceAndFilterByKeysIntoList(String partitionName,
                                                                                
            List<String> sortedKeys,
-                                                                               
            FileSlice fileSlice) {
-    Map<String, List<HoodieRecord<HoodieMetadataPayload>>> map = new 
HashMap<>();
-    try (ClosableIterator<Pair<String, HoodieRecord<HoodieMetadataPayload>>> 
iterator =
-             lookupKeyRecordPairsItr(partitionName, sortedKeys, fileSlice)) {
-      iterator.forEachRemaining(entry -> map.put(entry.getKey(), 
Collections.singletonList(entry.getValue())));
-    }
-    return HoodieListPairData.eager(map);
-  }
-
-  private HoodieData<HoodieRecord<HoodieMetadataPayload>> lookupRecords(String 
partitionName,
-                                                                        
List<String> sortedKeys,
-                                                                        
FileSlice fileSlice,
-                                                                        
boolean isFullKey) {
+                                                                               
            FileSlice fileSlice,
+                                                                               
            boolean isFullKey) {
     List<HoodieRecord<HoodieMetadataPayload>> res = new ArrayList<>();
     try (ClosableIterator<HoodieRecord<HoodieMetadataPayload>> iterator = 
lookupRecordsItr(partitionName, sortedKeys, fileSlice, isFullKey)) {
       iterator.forEachRemaining(res::add);
@@ -621,38 +543,41 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
     return HoodieListData.lazy(res);
   }
 
-  private ClosableIterator<Pair<String, HoodieRecord<HoodieMetadataPayload>>> 
lookupKeyRecordPairsItr(String partitionName,
-                                                                               
                       List<String> sortedKeys,
-                                                                               
                       FileSlice fileSlice) {
+  private ClosableIterator<Pair<String, HoodieRecord<HoodieMetadataPayload>>> 
readSliceAndFilterByKeys(String partitionName,
+                                                                               
                        List<String> sortedKeys,
+                                                                               
                        FileSlice fileSlice) {
     boolean isSecondaryIndex = 
MetadataPartitionType.fromPartitionPath(partitionName).equals(MetadataPartitionType.SECONDARY_INDEX);
-    return lookupRecords(partitionName, sortedKeys, fileSlice, metadataRecord 
-> {
-      HoodieMetadataPayload payload = new 
HoodieMetadataPayload(Option.of(metadataRecord));
-      String rowKey = payload.key != null ? payload.key : 
metadataRecord.get(KEY_FIELD_NAME).toString();
-      HoodieKey hoodieKey = new HoodieKey(rowKey, partitionName);
-      return Pair.of(rowKey, new HoodieAvroRecord<>(hoodieKey, payload));
-    }, !isSecondaryIndex);
+    return new CloseableFilterIterator<>(
+        readSliceAndFilterByKeysIntoList(partitionName, sortedKeys, fileSlice, 
metadataRecord -> {
+          HoodieMetadataPayload payload = new 
HoodieMetadataPayload(Option.of(metadataRecord));
+          String rowKey = payload.key != null ? payload.key : 
metadataRecord.get(KEY_FIELD_NAME).toString();
+          HoodieKey hoodieKey = new HoodieKey(rowKey, partitionName);
+          return Pair.of(rowKey, new HoodieAvroRecord<>(hoodieKey, payload));
+        }, !isSecondaryIndex),
+        p -> !p.getValue().getData().isDeleted());
   }
 
   private ClosableIterator<HoodieRecord<HoodieMetadataPayload>> 
lookupRecordsItr(String partitionName,
                                                                                
  List<String> keys,
                                                                                
  FileSlice fileSlice,
                                                                                
  boolean isFullKey) {
-    return lookupRecords(partitionName, keys, fileSlice,
-        metadataRecord -> {
-          HoodieMetadataPayload payload = new 
HoodieMetadataPayload(Option.of(metadataRecord));
-          return new HoodieAvroRecord<>(new HoodieKey(payload.key, 
partitionName), payload);
-        }, isFullKey);
+    return new CloseableFilterIterator<>(
+      readSliceAndFilterByKeysIntoList(partitionName, keys, fileSlice, 
metadataRecord -> {
+        HoodieMetadataPayload payload = new 
HoodieMetadataPayload(Option.of(metadataRecord));
+        return new HoodieAvroRecord<>(new HoodieKey(payload.key, 
partitionName), payload);
+      }, isFullKey),
+        r -> !r.getData().isDeleted());
   }
 
   /**
    * Lookup records and produce a lazy iterator of mapped HoodieRecords.
    * @param isFullKey If true, perform exact key match. If false, perform 
prefix match.
    */
-  private <T> ClosableIterator<T> lookupRecords(String partitionName,
-                                                List<String> sortedKeys,
-                                                FileSlice fileSlice,
-                                                
SerializableFunctionUnchecked<GenericRecord, T> transformer,
-                                                boolean isFullKey) {
+  private <T> ClosableIterator<T> readSliceAndFilterByKeysIntoList(String 
partitionName,
+                                                                   
List<String> sortedKeys,
+                                                                   FileSlice 
fileSlice,
+                                                                   
SerializableFunctionUnchecked<GenericRecord, T> transformer,
+                                                                   boolean 
isFullKey) {
     // If no keys to lookup, we must return early, otherwise, the hfile lookup 
will return all records.
     if (sortedKeys.isEmpty()) {
       return new EmptyIterator<>();
@@ -693,16 +618,6 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
           .map(Literal::from)
           .collect(Collectors.toList()));
     } else {
-      // For secondary index we need an extra step of processing the key to 
lookup before handing it over to filegroup reader.
-      if (MetadataPartitionType.fromPartitionPath(partitionName)
-          .equals(MetadataPartitionType.SECONDARY_INDEX)) {
-        // For secondary index, always use prefix matching
-        return Predicates.startsWithAny(null,
-            sortedKeys.stream()
-                .map(escapedKey -> escapedKey + 
SECONDARY_INDEX_RECORD_KEY_SEPARATOR)
-                .map(Literal::from)
-                .collect(Collectors.toList()));
-      }
       // For non-secondary index with prefix matching
       return Predicates.startsWithAny(null, sortedKeys.stream()
           .map(Literal::from)
@@ -843,37 +758,37 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
   }
 
   @Override
-  public HoodiePairData<String, String> 
getSecondaryIndexRecords(HoodieData<String> secondaryKeys, String 
partitionName) {
+  public HoodiePairData<String, String> 
readSecondaryIndexDataTableRecordKeysWithKeys(HoodieData<String> secondaryKeys, 
String partitionName) {
     HoodieIndexVersion indexVersion = 
existingIndexVersionOrDefault(partitionName, dataMetaClient);
 
     return dataCleanupManager.ensureDataCleanupOnException(v -> {
       if (indexVersion.equals(HoodieIndexVersion.V1)) {
-        return getSecondaryIndexRecordsV1(secondaryKeys, partitionName);
+        return readSecondaryIndexDataTableRecordKeysWithKeysV1(secondaryKeys, 
partitionName);
       } else if (indexVersion.equals(HoodieIndexVersion.V2)) {
-        return getSecondaryIndexRecordsV2(secondaryKeys, partitionName);
+        return readSecondaryIndexDataTableRecordKeysWithKeysV2(secondaryKeys, 
partitionName);
       } else {
         throw new IllegalArgumentException("getSecondaryIndexRecords does not 
support index with version " + indexVersion);
       }
     });
   }
 
-  private HoodiePairData<String, String> 
getSecondaryIndexRecordsV1(HoodieData<String> keys, String partitionName) {
+  private HoodiePairData<String, String> 
readSecondaryIndexDataTableRecordKeysWithKeysV1(HoodieData<String> keys, String 
partitionName) {
     if (keys.isEmpty()) {
       return HoodieListPairData.eager(Collections.emptyList());
     }
 
-    return getRecordsByKeyPrefixes(keys, partitionName, false, 
SecondaryIndexKeyUtils::escapeSpecialChars)
-        .filter(hoodieRecord -> !hoodieRecord.getData().isDeleted())
+    HoodieData<SecondaryIndexPrefixRawKey> rawKeys = 
keys.map(SecondaryIndexPrefixRawKey::new);
+    return getRecordsByKeyPrefixes(rawKeys, partitionName, false)
         .mapToPair(hoodieRecord -> 
SecondaryIndexKeyUtils.getSecondaryKeyRecordKeyPair(hoodieRecord.getRecordKey()));
   }
 
-  private HoodiePairData<String, String> 
getSecondaryIndexRecordsV2(HoodieData<String> secondaryKeys, String 
partitionName) {
+  private HoodiePairData<String, String> 
readSecondaryIndexDataTableRecordKeysWithKeysV2(HoodieData<String> 
secondaryKeys, String partitionName) {
     if (secondaryKeys.isEmpty()) {
       return HoodieListPairData.eager(Collections.emptyList());
     }
 
-    return readIndexRecords(secondaryKeys, partitionName, 
SecondaryIndexKeyUtils::escapeSpecialChars)
-        .filter(hoodieRecord -> !hoodieRecord.getData().isDeleted())
+    HoodieData<SecondaryIndexPrefixRawKey> rawKeys = 
secondaryKeys.map(SecondaryIndexPrefixRawKey::new);
+    return readIndexRecords(rawKeys, partitionName)
         .mapToPair(hoodieRecord -> 
SecondaryIndexKeyUtils.getSecondaryKeyRecordKeyPair(hoodieRecord.getRecordKey()));
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
index 8035d31af5c2..6323d13ffe1a 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
@@ -22,7 +22,6 @@ import org.apache.hudi.avro.model.HoodieMetadataColumnStats;
 import org.apache.hudi.common.bloom.BloomFilter;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.data.HoodiePairData;
-import org.apache.hudi.common.function.SerializableFunctionUnchecked;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -230,42 +229,43 @@ public interface HoodieTableMetadata extends 
Serializable, AutoCloseable {
    * Returns pairs of (record key, location of record key) which are found in 
the record index.
    * Records that are not found are ignored and wont be part of map object 
that is returned.
    */
-  HoodiePairData<String, HoodieRecordGlobalLocation> 
readRecordIndex(HoodieData<String> recordKeys);
+  HoodiePairData<String, HoodieRecordGlobalLocation> 
readRecordIndexLocationsWithKeys(HoodieData<String> recordKeys);
 
   /**
    * Returns the location of record keys which are found in the record index.
    * Records that are not found are ignored and wont be part of map object 
that is returned.
    */
   default HoodieData<HoodieRecordGlobalLocation> 
readRecordIndexLocations(HoodieData<String> recordKeys) {
-    return readRecordIndex(recordKeys).values();
+    return readRecordIndexLocationsWithKeys(recordKeys).values();
   }
 
   /**
    * Returns pairs of (secondary key, location of secondary key) which the 
provided secondary keys maps to.
    * Records that are not found are ignored and won't be part of map object 
that is returned.
    */
-  HoodiePairData<String, HoodieRecordGlobalLocation> 
readSecondaryIndex(HoodieData<String> secondaryKeys, String partitionName);
+  HoodiePairData<String, HoodieRecordGlobalLocation> 
readSecondaryIndexLocationsWithKeys(HoodieData<String> secondaryKeys, String 
partitionName);
 
   /**
    * Returns the location of secondary keys which are found in the secondary 
index.
    * Records that are not found are ignored and won't be part of map object 
that is returned.
    */
   default HoodieData<HoodieRecordGlobalLocation> 
readSecondaryIndexLocations(HoodieData<String> secondaryKeys, String 
partitionName) {
-    return readSecondaryIndex(secondaryKeys, partitionName).values();
+    return readSecondaryIndexLocationsWithKeys(secondaryKeys, 
partitionName).values();
   }
 
   /**
-   * Fetch records by key prefixes. Key prefix passed is expected to match the 
same prefix as stored in Metadata table partitions. For eg, in case of col 
stats partition,
-   * actual keys in metadata partition is encoded values of column name, 
partition name and file name. So, key prefixes passed to this method is 
expected to be encoded already.
+   * Fetch records by key prefixes. The raw keys are encoded using their 
encode() method to generate
+   * the actual key prefixes used for lookup in the metadata table partitions.
    *
-   * @param keyPrefixes   list of key prefixes for which interested records 
are looked up for.
-   * @param partitionName partition name in metadata table where the records 
are looked up for.
-   * @return {@link HoodieData} of {@link HoodieRecord}s with records matching 
the passed in key prefixes.
+   * @param rawKeys           list of raw key objects to be encoded into key 
prefixes
+   * @param partitionName     partition name in metadata table where the 
records are looked up for
+   * @param shouldLoadInMemory whether to load records in memory
+   * @return {@link HoodieData} of {@link HoodieRecord}s with records matching 
the encoded key prefixes
    */
-  HoodieData<HoodieRecord<HoodieMetadataPayload>> 
getRecordsByKeyPrefixes(HoodieData<String> keyPrefixes,
-                                                                          
String partitionName,
-                                                                          
boolean shouldLoadInMemory,
-                                                                          
SerializableFunctionUnchecked<String, String> keyEncoder);
+  HoodieData<HoodieRecord<HoodieMetadataPayload>> getRecordsByKeyPrefixes(
+      HoodieData<? extends RawKey> rawKeys,
+      String partitionName,
+      boolean shouldLoadInMemory);
 
   /**
    * Get the instant time to which the metadata is synced w.r.t data timeline.
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 144f2a7be44a..6fef6d3febeb 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -55,7 +55,6 @@ import org.apache.hudi.common.engine.HoodieReaderContext;
 import org.apache.hudi.common.engine.ReaderContextFactory;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.function.SerializableBiFunction;
-import org.apache.hudi.common.function.SerializableFunctionUnchecked;
 import org.apache.hudi.common.function.SerializablePairFunction;
 import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
 import org.apache.hudi.common.model.FileSlice;
@@ -210,7 +209,6 @@ public class HoodieTableMetadataUtil {
   public static final String PARTITION_NAME_EXPRESSION_INDEX_PREFIX = 
"expr_index_";
   public static final String PARTITION_NAME_SECONDARY_INDEX = 
"secondary_index";
   public static final String PARTITION_NAME_SECONDARY_INDEX_PREFIX = 
"secondary_index_";
-  public static final SerializableFunctionUnchecked<String, String> 
IDENTITY_ENCODING = key -> key;
 
   private static final Set<Schema.Type> SUPPORTED_TYPES_PARTITION_STATS = new 
HashSet<>(Arrays.asList(
       Schema.Type.INT, Schema.Type.LONG, Schema.Type.FLOAT, 
Schema.Type.DOUBLE, Schema.Type.STRING, Schema.Type.BOOLEAN, Schema.Type.NULL, 
Schema.Type.BYTES));
@@ -1366,7 +1364,7 @@ public class HoodieTableMetadataUtil {
   public static SerializableBiFunction<String, Integer, Integer> 
getSecondaryKeyToFileGroupMappingFunction(boolean needsSecondaryKeyExtraction) {
     if (needsSecondaryKeyExtraction) {
       return (recordKey, numFileGroups) -> {
-        String secondaryKey = 
SecondaryIndexKeyUtils.getUnescapedSecondaryKeyFromSecondaryIndexKey(recordKey);
+        String secondaryKey = 
SecondaryIndexKeyUtils.getUnescapedSecondaryKeyPrefixFromSecondaryIndexKey(recordKey);
         return mapRecordKeyToFileGroupIndex(secondaryKey, numFileGroups);
       };
     }
@@ -2716,7 +2714,8 @@ public class HoodieTableMetadataUtil {
           // Fetch metadata table COLUMN_STATS partition records for above 
files
           List<HoodieColumnRangeMetadata<Comparable>> partitionColumnMetadata 
= tableMetadata
               .getRecordsByKeyPrefixes(
-                  HoodieListData.lazy(generateKeyPrefixes(colsToIndex, 
partitionName)), MetadataPartitionType.COLUMN_STATS.getPartitionPath(), false, 
IDENTITY_ENCODING)
+                  HoodieListData.lazy(generateColumnStatsKeys(colsToIndex, 
partitionName)), 
+                  MetadataPartitionType.COLUMN_STATS.getPartitionPath(), false)
               // schema and properties are ignored in getInsertValue, so 
simply pass as null
               .map(record -> 
((HoodieMetadataPayload)record.getData()).getColumnStatMetadata())
               .filter(Option::isPresent)
@@ -2767,18 +2766,25 @@ public class HoodieTableMetadataUtil {
 
   /**
    * Generate key prefixes for each combination of column name in {@param 
columnsToIndex} and {@param partitionName}.
+   * @deprecated Use {@link #generateColumnStatsKeys} instead
    */
+  @Deprecated
   public static List<String> generateKeyPrefixes(List<String> columnsToIndex, 
String partitionName) {
-    List<String> keyPrefixes = new ArrayList<>();
-    PartitionIndexID partitionIndexId = new 
PartitionIndexID(getColumnStatsIndexPartitionIdentifier(partitionName));
+    List<ColumnStatsIndexPrefixRawKey> rawKeys = 
generateColumnStatsKeys(columnsToIndex, partitionName);
+    return rawKeys.stream()
+        .map(key -> key.encode())
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * Generate column stats index keys for each combination of column name in 
{@param columnsToIndex} and {@param partitionName}.
+   */
+  public static List<ColumnStatsIndexPrefixRawKey> 
generateColumnStatsKeys(List<String> columnsToIndex, String partitionName) {
+    List<ColumnStatsIndexPrefixRawKey> keys = new ArrayList<>();
     for (String columnName : columnsToIndex) {
-      ColumnIndexID columnIndexID = new ColumnIndexID(columnName);
-      String keyPrefix = columnIndexID.asBase64EncodedString()
-          .concat(partitionIndexId.asBase64EncodedString());
-      keyPrefixes.add(keyPrefix);
+      keys.add(new ColumnStatsIndexPrefixRawKey(columnName, partitionName));
     }
-
-    return keyPrefixes;
+    return keys;
   }
 
   private static List<HoodieColumnRangeMetadata<Comparable>> 
translateWriteStatToFileStats(HoodieWriteStat writeStat,
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/RawKey.java 
b/hudi-common/src/main/java/org/apache/hudi/metadata/RawKey.java
new file mode 100644
index 000000000000..0d58727bf11d
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/RawKey.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import java.io.Serializable;
+
+/**
+ * Interface for raw keys that can be encoded for storage in the metadata 
table.
+ */
+public interface RawKey extends Serializable {
+  
+  /**
+   * Encode this raw key into a string for storage.
+   * @return The encoded string key
+   */
+  String encode();
+}
\ No newline at end of file
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/RecordIndexRawKey.java 
b/hudi-common/src/main/java/org/apache/hudi/metadata/RecordIndexRawKey.java
new file mode 100644
index 000000000000..23e9d127a96c
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/RecordIndexRawKey.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import java.util.Objects;
+
+/**
+ * Represents a record index key that requires no encoding (identity encoding).
+ */
+public class RecordIndexRawKey implements RawKey {
+  private final String recordKey;
+
+  public RecordIndexRawKey(String recordKey) {
+    this.recordKey = Objects.requireNonNull(recordKey);
+  }
+
+  @Override
+  public String encode() {
+    // Identity encoding - return the key as-is
+    return recordKey;
+  }
+
+  public String getRecordKey() {
+    return recordKey;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    RecordIndexRawKey that = (RecordIndexRawKey) o;
+    return Objects.equals(recordKey, that.recordKey);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(recordKey);
+  }
+
+  @Override
+  public String toString() {
+    return "RecordIndexRawKey{" + "recordKey='" + recordKey + '\'' + '}';
+  }
+}
\ No newline at end of file
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexKeyUtils.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexKeyUtils.java
index e93614e719ea..2b68fba85e3d 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexKeyUtils.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexKeyUtils.java
@@ -31,68 +31,55 @@ public class SecondaryIndexKeyUtils {
   // Escape character
   public static final char ESCAPE_CHAR = '\\';
 
-  /**
-   * Use this function if you want to get both record key and secondary key.
-   *
-   * @returns pair of secondary key, record key.
-   * */
+  // Give "<encoded secondaryKey>$<encoded primaryKey>"
+  // Extract Pair<<secondaryKey>,<primaryKey>>
   public static Pair<String, String> getSecondaryKeyRecordKeyPair(String 
secIdxRecKey) {
     int delimiterIndex = getSecondaryIndexKeySeparatorPosition(secIdxRecKey);
     return Pair.of(unescapeSpecialChars(secIdxRecKey.substring(0, 
delimiterIndex)), unescapeSpecialChars(secIdxRecKey.substring(delimiterIndex + 
1)));
   }
 
-  /**
-   * Use this function if you want to get both record key and secondary key.
-   *
-   * @returns pair of secondary key, record key.
-   * */
+  // Give "<encoded secondaryKey>$<encoded primaryKey>"
+  // Extract Pair<<primaryKey>,<secondaryKey>>
   public static Pair<String, String> getRecordKeySecondaryKeyPair(String 
secIdxRecKey) {
     int delimiterIndex = getSecondaryIndexKeySeparatorPosition(secIdxRecKey);
     return Pair.of(unescapeSpecialChars(secIdxRecKey.substring(delimiterIndex 
+ 1)), unescapeSpecialChars(secIdxRecKey.substring(0, delimiterIndex)));
   }
 
-  /**
-   * Extracts the record key portion from an encoded secondary index key.
-   *
-   * @param secIdxRecKey the encoded key in the form 
"escapedSecondaryKey$escapedRecordKey"
-   * @return the unescaped record key, or {@code null} if the record key was 
{@code null}
-   * @throws IllegalStateException if the key format is invalid (i.e., no 
unescaped separator found)
-   */
+  // Give "<encoded secondaryKey>$<encoded primaryKey>"
+  // Extract <primaryKey>
   public static String getRecordKeyFromSecondaryIndexKey(String secIdxRecKey) {
-    // the payload key is in the format of "secondaryKey$primaryKey"
-    // we need to extract the primary key from the payload key
     int delimiterIndex = getSecondaryIndexKeySeparatorPosition(secIdxRecKey);
     return unescapeSpecialChars(secIdxRecKey.substring(delimiterIndex + 1));
   }
 
-  /**
-   * Extracts the secondary key portion from an encoded secondary index key.
-   *
-   * @param secIdxRecKey the encoded key in the form 
"escapedSecondaryKey$escapedRecordKey"
-   * @return the unescaped secondary key, or {@code null} if the secondary key 
was {@code null}
-   * @throws IllegalStateException if the key format is invalid (i.e., no 
unescaped separator found)
-   */
+  // Give "<encoded secondaryKey>$<encoded primaryKey>"
+  // Extract <secondaryKey>
   public static String getSecondaryKeyFromSecondaryIndexKey(String 
secIdxRecKey) {
-    // the payload key is in the format of "secondaryKey$primaryKey"
-    // we need to extract the secondary key from the payload key
     return 
unescapeSpecialChars(getUnescapedSecondaryKeyFromSecondaryIndexKey(secIdxRecKey));
   }
 
+  // Give "<encoded secondaryKey>$<encoded primaryKey>"
+  // Extract "<encoded secondaryKey>$"
+  public static String 
getUnescapedSecondaryKeyPrefixFromSecondaryIndexKey(String secIdxRecKey) {
+    int delimiterIndex = getSecondaryIndexKeySeparatorPosition(secIdxRecKey);
+    return secIdxRecKey.substring(0, delimiterIndex + 1);
+  }
+
+  // Give <secondaryKey>
+  // Extract "<encoded secondaryKey>$"
+  public static String getEscapedSecondaryKeyPrefixFromSecondaryKey(String 
secKey) {
+    return String.format("%s%s", escapeSpecialChars(secKey), 
SECONDARY_INDEX_RECORD_KEY_SEPARATOR_CHAR);
+  }
+
+  // Give "<encoded secondaryKey>$<encoded primaryKey>"
+  // Extract "<encoded secondaryKey>"
   public static String getUnescapedSecondaryKeyFromSecondaryIndexKey(String 
secIdxRecKey) {
-    // the payload key is in the format of "secondaryKey$primaryKey"
-    // we need to extract the secondary key from the payload key
     int delimiterIndex = getSecondaryIndexKeySeparatorPosition(secIdxRecKey);
     return secIdxRecKey.substring(0, delimiterIndex);
   }
 
-  /**
-   * Constructs an encoded secondary index key by escaping the given secondary 
and record keys,
-   * and concatenating them with the separator {@code "$"}.
-   *
-   * @param unescapedSecKey the secondary key (can be {@code null})
-   * @param unescapedRecordKey the record key (can be {@code null})
-   * @return a string representing the encoded secondary index key
-   */
+  // give <secondaryKey> and <primaryKey>
+  // construct "<encoded secondaryKey>$<encoded primaryKey>"
   public static String constructSecondaryIndexKey(String unescapedSecKey, 
String unescapedRecordKey) {
     return escapeSpecialChars(unescapedSecKey) + 
SECONDARY_INDEX_RECORD_KEY_SEPARATOR + escapeSpecialChars(unescapedRecordKey);
   }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexPrefixRawKey.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexPrefixRawKey.java
new file mode 100644
index 000000000000..fc442e07e0fe
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexPrefixRawKey.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import java.util.Objects;
+
+/**
+ * Represents a secondary index key, whose raw content is the column value of 
the data table.
+ */
+public class SecondaryIndexPrefixRawKey implements RawKey {
+  private final String secondaryKey;
+
+  public SecondaryIndexPrefixRawKey(String secondaryKey) {
+    this.secondaryKey = secondaryKey;
+  }
+
+  @Override
+  public String encode() {
+    return 
SecondaryIndexKeyUtils.getEscapedSecondaryKeyPrefixFromSecondaryKey(secondaryKey);
+  }
+
+  public String getSecondaryKey() {
+    return secondaryKey;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    SecondaryIndexPrefixRawKey that = (SecondaryIndexPrefixRawKey) o;
+    return Objects.equals(secondaryKey, that.secondaryKey);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(secondaryKey);
+  }
+
+  @Override
+  public String toString() {
+    return "SecondaryIndexKey{" + "secondaryKey='" + secondaryKey + '\'' + '}';
+  }
+}
\ No newline at end of file
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieBackedTableMetadataDataCleanup.java
 
b/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieBackedTableMetadataDataCleanup.java
index c00ba7ac1556..dea2011f4a15 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieBackedTableMetadataDataCleanup.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieBackedTableMetadataDataCleanup.java
@@ -98,14 +98,14 @@ public class TestHoodieBackedTableMetadataDataCleanup {
     HoodieData<String> recordKeys = HoodieListData.eager(Arrays.asList("key1", 
"key2"));
     
     // Setup mock behavior
-    when(mockMetadata.getRecordsByKeys(any(), any(), 
any())).thenReturn(mockPairData);
+    when(mockMetadata.readIndexRecordsWithKeys(any(), 
any())).thenReturn(mockPairData);
     when(mockPairData.mapToPair(any())).thenReturn(mockResult);
     
     // Call real method on the mock
-    when(mockMetadata.readRecordIndex(recordKeys)).thenCallRealMethod();
+    
when(mockMetadata.readRecordIndexLocationsWithKeys(recordKeys)).thenCallRealMethod();
     
     // Execute the method
-    HoodiePairData result = mockMetadata.readRecordIndex(recordKeys);
+    HoodiePairData result = 
mockMetadata.readRecordIndexLocationsWithKeys(recordKeys);
     
     // Verify cleanup manager was invoked
     verify(spyCleanupManager).ensureDataCleanupOnException(any());
@@ -124,7 +124,7 @@ public class TestHoodieBackedTableMetadataDataCleanup {
     
     // Setup mock behavior for readIndexRecords
     HoodieData mockIndexRecords = mock(HoodieData.class);
-    when(mockMetadata.readIndexRecords(any(), anyString(), 
any())).thenReturn(mockIndexRecords);
+    when(mockMetadata.readIndexRecords(any(), 
anyString())).thenReturn(mockIndexRecords);
     when(mockIndexRecords.map(any())).thenReturn(mockHoodieData);
     
     // Call real method on the mock
@@ -155,15 +155,15 @@ public class TestHoodieBackedTableMetadataDataCleanup {
           .thenReturn(HoodieIndexVersion.V2);
       
       // Setup mock behavior for V2 path
-      when(mockMetadata.getRecordKeysFromSecondaryKeysV2(any(), 
anyString())).thenReturn(mockHoodieData);
+      when(mockMetadata.readSecondaryIndexDataTableRecordKeysV2(any(), 
anyString())).thenReturn(mockHoodieData);
       when(mockPairData.mapToPair(any())).thenReturn(mockResult);
       
       // Call real method on the mock
-      when(mockMetadata.readSecondaryIndex(secondaryKeys, 
partitionName)).thenCallRealMethod();
+      when(mockMetadata.readSecondaryIndexLocationsWithKeys(secondaryKeys, 
partitionName)).thenCallRealMethod();
       
       // Execute the method - it may throw NPE due to mocks, but we just want 
to verify cleanup manager is called
       try {
-        mockMetadata.readSecondaryIndex(secondaryKeys, partitionName);
+        mockMetadata.readSecondaryIndexLocationsWithKeys(secondaryKeys, 
partitionName);
       } catch (Exception e) {
         // Expected - we're testing with mocks
       }
@@ -189,7 +189,7 @@ public class TestHoodieBackedTableMetadataDataCleanup {
       
       // Setup mock behavior for V2 path
       when(mockPairData.values()).thenReturn(mockHoodieData);
-      when(mockMetadata.getRecordKeysFromSecondaryKeysV2(any(), 
anyString())).thenReturn(mockHoodieData);
+      when(mockMetadata.readSecondaryIndexDataTableRecordKeysV2(any(), 
anyString())).thenReturn(mockHoodieData);
       
when(mockMetadata.readRecordIndexLocations(any())).thenReturn(mockHoodieData);
       
       // Call real method on the mock
@@ -221,12 +221,12 @@ public class TestHoodieBackedTableMetadataDataCleanup {
     
doThrow(testException).when(mockCleanupManager).ensureDataCleanupOnException(any());
     
     // Call real method on the mock
-    when(mockMetadata.readRecordIndex(any())).thenCallRealMethod();
+    
when(mockMetadata.readRecordIndexLocationsWithKeys(any())).thenCallRealMethod();
     
     // Execute and verify exception is propagated
     HoodieData<String> recordKeys = 
HoodieListData.eager(Arrays.asList("key1"));
     try {
-      mockMetadata.readRecordIndex(recordKeys);
+      mockMetadata.readRecordIndexLocationsWithKeys(recordKeys);
       fail("Expected exception was not thrown");
     } catch (HoodieException e) {
       assertEquals("Test exception from cleanup manager", e.getMessage());
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
 
b/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
index 5900fdafbdde..b864fc29abf5 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
@@ -23,38 +23,31 @@ import 
org.apache.hudi.common.function.SerializableBiFunction;
 import org.junit.jupiter.api.Test;
 
 import static 
org.apache.hudi.metadata.SecondaryIndexKeyUtils.constructSecondaryIndexKey;
-import static 
org.apache.hudi.metadata.SecondaryIndexKeyUtils.escapeSpecialChars;
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotEquals;
 
 public class TestHoodieTableMetadataUtil {
 
   @Test
   public void testGetRecordKeyToFileGroupIndexFunction() {
-    // Test with secondary key format
-    String compositeKey = "secondaryKey$recordKey";
+    int numFileGroups = 10;
+    String recordKey = "recordKey$";
+    String secondaryKey = "secondaryKey$";
+    // Raw key used for read path
+    SecondaryIndexPrefixRawKey rawKey1 = new 
SecondaryIndexPrefixRawKey(secondaryKey);
+    // Composite key used for write path
+    String compositeKey = constructSecondaryIndexKey(secondaryKey, recordKey);
 
     SerializableBiFunction<String, Integer, Integer> hashOnSecKeyOnly =
         
HoodieTableMetadataUtil.getSecondaryKeyToFileGroupMappingFunction(true);
     SerializableBiFunction<String, Integer, Integer> hashOnFullKey =
         
HoodieTableMetadataUtil.getSecondaryKeyToFileGroupMappingFunction(false);
 
-    int result1 = hashOnSecKeyOnly.apply(compositeKey, 10);
-    int result2 = 
hashOnSecKeyOnly.apply("anotherSecondaryKey$anotherRecordKey", 10);
+    // On write path we use hashOnSecKeyOnly
+    int result1 = hashOnSecKeyOnly.apply(compositeKey, numFileGroups);
+    // On read path, we use hashOnFullKey
+    int result2 = hashOnFullKey.apply(rawKey1.encode(), numFileGroups);
 
-    // Both should hash the secondary key portion
-    assertNotEquals(result1, result2);
-
-    // Test with regular key format
-    int result3 = hashOnFullKey.apply("simpleKey", 10);
-    int result4 = hashOnFullKey.apply("anotherSimpleKey", 10);
-
-    // Both should hash the full key
-    assertNotEquals(result3, result4);
-
-    // Hash on Sec key only <=> Hash full key if key equals to UNESCAPED sec 
key
-    assertEquals(hashOnSecKeyOnly.apply("secKey$recKey", 10), 
hashOnFullKey.apply("secKey", 10));
-    assertEquals(hashOnSecKeyOnly.apply(constructSecondaryIndexKey("seckey", 
"reckey"), 10), hashOnFullKey.apply("seckey", 10));
-    assertEquals(hashOnSecKeyOnly.apply(constructSecondaryIndexKey("$", 
"reckey"), 10), hashOnFullKey.apply(escapeSpecialChars("$"), 10));
+    // Both should hash the secondary key portion so read and write paths are 
consistent.
+    assertEquals(result1, result2);
   }
 }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/metadata/TestSecondaryIndexKeyUtils.java
 
b/hudi-common/src/test/java/org/apache/hudi/metadata/TestSecondaryIndexKeyUtils.java
index 1ace3e6fa6a4..b9747306719d 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/metadata/TestSecondaryIndexKeyUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/metadata/TestSecondaryIndexKeyUtils.java
@@ -27,6 +27,7 @@ import org.junit.jupiter.params.provider.ValueSource;
 
 import java.util.stream.Stream;
 
+import static 
org.apache.hudi.metadata.SecondaryIndexKeyUtils.getUnescapedSecondaryKeyPrefixFromSecondaryIndexKey;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -201,8 +202,10 @@ public class TestSecondaryIndexKeyUtils {
   @ParameterizedTest(name = "Key construction round-trip: secondaryKey='{0}', 
recordKey='{1}'")
   @MethodSource("keyConstructionRoundTripTestCases")
   public void testKeyConstructionRoundTrip(String secondaryKey, String 
recordKey) {
-    // Construct the key
+    // Construct the key used by the writer path
     String constructedKey = 
SecondaryIndexKeyUtils.constructSecondaryIndexKey(secondaryKey, recordKey);
+    // The key used by the reader path and the key used by the writer path 
have the following invariant.
+    assertEquals(new SecondaryIndexPrefixRawKey(secondaryKey).encode(), 
getUnescapedSecondaryKeyPrefixFromSecondaryIndexKey(constructedKey));
     
     // Extract both parts
     String extractedSecondary = 
SecondaryIndexKeyUtils.getSecondaryKeyFromSecondaryIndexKey(constructedKey);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/FileStatsIndex.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/FileStatsIndex.java
index 0c1c296feab4..06b92ba229d4 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/FileStatsIndex.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/FileStatsIndex.java
@@ -28,8 +28,8 @@ import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.VisibleForTesting;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.common.util.collection.Tuple3;
-import org.apache.hudi.common.util.hash.ColumnIndexID;
 import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.metadata.ColumnStatsIndexPrefixRawKey;
 import org.apache.hudi.metadata.HoodieMetadataPayload;
 import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.metadata.HoodieTableMetadataUtil;
@@ -63,7 +63,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
 import static org.apache.hudi.common.util.ValidationUtils.checkState;
-import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.IDENTITY_ENCODING;
 import static 
org.apache.hudi.source.stats.ColumnStatsSchemas.COL_STATS_DATA_TYPE;
 import static 
org.apache.hudi.source.stats.ColumnStatsSchemas.COL_STATS_TARGET_POS;
 import static 
org.apache.hudi.source.stats.ColumnStatsSchemas.METADATA_DATA_TYPE;
@@ -389,15 +388,15 @@ public class FileStatsIndex implements ColumnStatsIndex {
         "Column stats is only valid when push down filters have referenced 
columns");
 
     // Read Metadata Table's column stats Flink's RowData list by
-    //    - Fetching the records by key-prefixes (encoded column names)
+    //    - Fetching the records by key-prefixes (column names)
     //    - Deserializing fetched records into [[RowData]]s
-    // TODO encoding should be done internally w/in HoodieBackedTableMetadata
-    List<String> encodedTargetColumnNames = Arrays.stream(targetColumns)
-        .map(colName -> new 
ColumnIndexID(colName).asBase64EncodedString()).collect(Collectors.toList());
+    List<ColumnStatsIndexPrefixRawKey> rawKeys = Arrays.stream(targetColumns)
+        .map(ColumnStatsIndexPrefixRawKey::new)  // Just column name, no 
partition
+        .collect(Collectors.toList());
 
     HoodieData<HoodieRecord<HoodieMetadataPayload>> records =
         getMetadataTable().getRecordsByKeyPrefixes(
-            HoodieListData.lazy(encodedTargetColumnNames), 
getIndexPartitionName(), false, IDENTITY_ENCODING);
+            HoodieListData.lazy(rawKeys), getIndexPartitionName(), false);
 
     org.apache.hudi.util.AvroToRowDataConverters.AvroToRowDataConverter 
converter =
         AvroToRowDataConverters.createRowConverter((RowType) 
METADATA_DATA_TYPE.getLogicalType());
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
index 39edfd2c52f6..fe8bf929a0fc 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
@@ -758,29 +758,29 @@ public class TestHoodieTableMetadataUtil extends 
HoodieCommonTestHarness {
         // Test case 2: Secondary index record key with version >= 2
         Arguments.of(
             "Secondary index record key with version >= 2",
-            "primary_key$secondary_key",
+            "secondary_key$primary_key",
             10,
             "secondary_index_idx_ts",
             V2,
-            6  // Uses secondary key portion for hashing
+            8  // Uses secondary key portion for hashing
         ),
         // Test case 3: Secondary index record key but version < 2
         Arguments.of(
             "Secondary index record key but version < 2",
-            "primary_key$secondary_key",
+            "secondary_key$primary_key",
             10,
             "secondary_index_idx_ts",
             HoodieIndexVersion.V1,
-            4  // Uses full key for hashing
+            0  // Uses full key for hashing
         ),
         // Test case 4: Secondary index record key but not in secondary index 
partition
         Arguments.of(
             "Secondary index record key but not in secondary index partition",
-            "primary_key$secondary_key",
+            "secondary_key$primary_key",
             10,
             "files",
             HoodieIndexVersion.V1,
-            4  // Uses full key for hashing since not in secondary index 
partition
+            0  // Uses full key for hashing since not in secondary index 
partition
         ),
         // Test case 7: Single file group
         Arguments.of(
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
index 204a50ca2a03..0a1aed48efaf 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
@@ -31,7 +31,7 @@ import org.apache.hudi.common.util.ValidationUtils.checkState
 import org.apache.hudi.common.util.collection
 import org.apache.hudi.common.util.hash.{ColumnIndexID, PartitionIndexID}
 import org.apache.hudi.data.HoodieJavaRDD
-import org.apache.hudi.metadata.{HoodieMetadataPayload, HoodieTableMetadata, 
HoodieTableMetadataUtil, MetadataPartitionType}
+import org.apache.hudi.metadata.{ColumnStatsIndexPrefixRawKey, 
HoodieMetadataPayload, HoodieTableMetadata, HoodieTableMetadataUtil, 
MetadataPartitionType}
 import 
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS
 import org.apache.hudi.util.JFunction
 
@@ -342,23 +342,19 @@ class ColumnStatsIndexSupport(spark: SparkSession,
     //    - Filtering out nulls
     checkState(targetColumns.nonEmpty)
 
-    // TODO encoding should be done internally w/in HoodieBackedTableMetadata
-    val encodedTargetColumnNames = targetColumns.map(colName => new 
ColumnIndexID(colName).asBase64EncodedString())
-    // encode column name and parition name if partition list is available
-    val keyPrefixes = if (prunedPartitions.isDefined) {
-      prunedPartitions.get.map(partitionPath =>
-        new 
PartitionIndexID(HoodieTableMetadataUtil.getPartitionIdentifier(partitionPath)).asBase64EncodedString()
-      ).flatMap(encodedPartition => {
-        encodedTargetColumnNames.map(encodedTargetColumn => 
encodedTargetColumn.concat(encodedPartition))
-      })
+    // Create raw key prefixes based on column names and optional partition 
names
+    val rawKeys = if (prunedPartitions.isDefined) {
+      val partitionsList = prunedPartitions.get.toList
+      targetColumns.flatMap(colName =>
+        partitionsList.map(partitionPath => new 
ColumnStatsIndexPrefixRawKey(colName, partitionPath))
+      )
     } else {
-      encodedTargetColumnNames
+      targetColumns.map(colName => new ColumnStatsIndexPrefixRawKey(colName))
     }
 
     val metadataRecords: HoodieData[HoodieRecord[HoodieMetadataPayload]] =
       metadataTable.getRecordsByKeyPrefixes(
-        HoodieListData.eager(keyPrefixes.toSeq.asJava), 
HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS, shouldReadInMemory,
-        HoodieTableMetadataUtil.IDENTITY_ENCODING)
+        HoodieListData.eager(rawKeys.asJava), 
HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS, shouldReadInMemory)
 
     val columnStatsRecords: HoodieData[HoodieMetadataColumnStats] =
       //TODO: [HUDI-8303] Explicit conversion might not be required for Scala 
2.12+
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ExpressionIndexSupport.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ExpressionIndexSupport.scala
index f81bda62ac38..e4544c82e70e 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ExpressionIndexSupport.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ExpressionIndexSupport.scala
@@ -35,7 +35,7 @@ import org.apache.hudi.common.util.ValidationUtils.checkState
 import org.apache.hudi.common.util.hash.{ColumnIndexID, PartitionIndexID}
 import org.apache.hudi.data.HoodieJavaRDD
 import org.apache.hudi.index.expression.HoodieExpressionIndex
-import org.apache.hudi.metadata.{HoodieMetadataPayload, 
HoodieTableMetadataUtil, MetadataPartitionType}
+import org.apache.hudi.metadata.{ColumnStatsIndexPrefixRawKey, 
HoodieMetadataPayload, HoodieTableMetadataUtil, MetadataPartitionType}
 import 
org.apache.hudi.metadata.HoodieTableMetadataUtil.getPartitionStatsIndexKey
 import org.apache.hudi.util.JFunction
 
@@ -344,23 +344,19 @@ class ExpressionIndexSupport(spark: SparkSession,
     //    - Filtering out nulls
     checkState(targetColumns.nonEmpty)
 
-    // TODO encoding should be done internally w/in HoodieBackedTableMetadata
-    val encodedTargetColumnNames = targetColumns.map(colName => new 
ColumnIndexID(colName).asBase64EncodedString())
-    // encode column name and parition name if partition list is available
-    val keyPrefixes = if (prunedPartitions.isDefined) {
-      prunedPartitions.get.map(partitionPath =>
-        new 
PartitionIndexID(HoodieTableMetadataUtil.getPartitionIdentifier(partitionPath)).asBase64EncodedString()
-      ).flatMap(encodedPartition => {
-        encodedTargetColumnNames.map(encodedTargetColumn => 
encodedTargetColumn.concat(encodedPartition))
-      })
+    // Create raw key prefixes based on column names and optional partition 
names
+    val rawKeys = if (prunedPartitions.isDefined) {
+      val partitionsList = prunedPartitions.get.toList
+      targetColumns.flatMap(colName =>
+        partitionsList.map(partitionPath => new 
ColumnStatsIndexPrefixRawKey(colName, partitionPath))
+      )
     } else {
-      encodedTargetColumnNames
+      targetColumns.map(colName => new ColumnStatsIndexPrefixRawKey(colName))
     }
 
     val metadataRecords: HoodieData[HoodieRecord[HoodieMetadataPayload]] =
       metadataTable.getRecordsByKeyPrefixes(
-        HoodieListData.eager(keyPrefixes.toSeq.asJava), 
HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS, shouldReadInMemory,
-        HoodieTableMetadataUtil.IDENTITY_ENCODING)
+        HoodieListData.eager(rawKeys.asJava), 
HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS, shouldReadInMemory)
 
     val columnStatsRecords: HoodieData[HoodieMetadataColumnStats] =
       //TODO: [HUDI-8303] Explicit conversion might not be required for Scala 
2.12+
@@ -504,9 +500,11 @@ class ExpressionIndexSupport(spark: SparkSession,
 
   private def loadExpressionIndexPartitionStatRecords(indexDefinition: 
HoodieIndexDefinition, shouldReadInMemory: Boolean): 
HoodieData[HoodieMetadataColumnStats] = {
     // We are omitting the partition name and only using the column name and 
expression index partition stat prefix to fetch the records
-    val recordKeyPrefix = 
getPartitionStatsIndexKey(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_PARTITION_STAT_PREFIX,
 indexDefinition.getSourceFields.get(0))
+    // Create a ColumnStatsIndexKey with the column and partition prefix
+    val rawKey = new 
ColumnStatsIndexPrefixRawKey(indexDefinition.getSourceFields.get(0),
+      
toJavaOption(Option(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_PARTITION_STAT_PREFIX)))
     val colStatsRecords: HoodieData[HoodieMetadataColumnStats] = 
loadExpressionIndexForColumnsInternal(
-      indexDefinition.getIndexName, shouldReadInMemory, Seq(recordKeyPrefix))
+      indexDefinition.getIndexName, shouldReadInMemory, Seq(rawKey))
     //TODO: [HUDI-8303] Explicit conversion might not be required for Scala 
2.12+
     colStatsRecords
   }
@@ -543,28 +541,28 @@ class ExpressionIndexSupport(spark: SparkSession,
                                                     indexPartition: String,
                                                     shouldReadInMemory: 
Boolean): HoodieData[HoodieMetadataColumnStats] = {
     // Read Metadata Table's Expression Index records into [[HoodieData]] 
container by
-    //    - Fetching the records from CSI by key-prefixes (encoded column 
names)
+    //    - Fetching the records from CSI by key-prefixes (column names)
     //    - Extracting [[HoodieMetadataColumnStats]] records
     //    - Filtering out nulls
     checkState(targetColumns.nonEmpty)
-    val encodedTargetColumnNames = targetColumns.map(colName => new 
ColumnIndexID(colName).asBase64EncodedString())
-    val keyPrefixes = if (prunedPartitions.nonEmpty) {
-      prunedPartitions.map(partitionPath =>
-        new 
PartitionIndexID(HoodieTableMetadataUtil.getPartitionIdentifier(partitionPath)).asBase64EncodedString()
-      ).flatMap(encodedPartition => {
-        encodedTargetColumnNames.map(encodedTargetColumn => 
encodedTargetColumn.concat(encodedPartition))
-      })
+
+    // Create raw key prefixes based on column names and optional partition 
names
+    val rawKeys = if (prunedPartitions.nonEmpty) {
+      val partitionsList = prunedPartitions.toList
+      targetColumns.flatMap(colName =>
+        partitionsList.map(partitionPath => new 
ColumnStatsIndexPrefixRawKey(colName, partitionPath))
+      )
     } else {
-      encodedTargetColumnNames
+      targetColumns.map(colName => new ColumnStatsIndexPrefixRawKey(colName))
     }
-    loadExpressionIndexForColumnsInternal(indexPartition, shouldReadInMemory, 
keyPrefixes)
+
+    loadExpressionIndexForColumnsInternal(indexPartition, shouldReadInMemory, 
rawKeys)
   }
 
-  private def loadExpressionIndexForColumnsInternal(indexPartition: String, 
shouldReadInMemory: Boolean, keyPrefixes: Iterable[String] with (String with 
Int => Any)) = {
+  private def loadExpressionIndexForColumnsInternal(indexPartition: String, 
shouldReadInMemory: Boolean, keyPrefixes: 
Iterable[ColumnStatsIndexPrefixRawKey]) = {
     val metadataRecords: HoodieData[HoodieRecord[HoodieMetadataPayload]] =
       metadataTable.getRecordsByKeyPrefixes(
-        HoodieListData.eager(keyPrefixes.toSeq.asJava), indexPartition, 
shouldReadInMemory,
-        HoodieTableMetadataUtil.IDENTITY_ENCODING)
+        HoodieListData.eager(keyPrefixes.toSeq.asJava), indexPartition, 
shouldReadInMemory)
     val columnStatsRecords: HoodieData[HoodieMetadataColumnStats] =
       //TODO: [HUDI-8303] Explicit conversion might not be required for Scala 
2.12+
       metadataRecords.map(JFunction.toJavaSerializableFunction(record => {
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionStatsIndexSupport.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionStatsIndexSupport.scala
index 5b5fcfeb12d4..0208628d4e73 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionStatsIndexSupport.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionStatsIndexSupport.scala
@@ -29,7 +29,7 @@ import org.apache.hudi.common.model.{FileSlice, HoodieRecord}
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.util.ValidationUtils.checkState
 import org.apache.hudi.common.util.hash.ColumnIndexID
-import org.apache.hudi.metadata.{HoodieMetadataPayload, 
HoodieTableMetadataUtil}
+import org.apache.hudi.metadata.{ColumnStatsIndexPrefixRawKey, 
HoodieMetadataPayload, HoodieTableMetadataUtil}
 import 
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS
 import org.apache.hudi.util.JFunction
 
@@ -68,12 +68,12 @@ class PartitionStatsIndexSupport(spark: SparkSession,
 
   override def loadColumnStatsIndexRecords(targetColumns: Seq[String], 
prunedPartitions: Option[Set[String]] = None, shouldReadInMemory: Boolean): 
HoodieData[HoodieMetadataColumnStats] = {
     checkState(targetColumns.nonEmpty)
-    val encodedTargetColumnNames = targetColumns.map(colName => new 
ColumnIndexID(colName).asBase64EncodedString())
-    logDebug(s"Loading column stats for columns: ${targetColumns.mkString(", 
")},  Encoded column names: ${encodedTargetColumnNames.mkString(", ")}")
+    logDebug(s"Loading column stats for columns: ${targetColumns.mkString(", 
")}")
+    // For partition stats, we only need column names (no partition name)
+    val rawKeys = targetColumns.map(colName => new 
ColumnStatsIndexPrefixRawKey(colName))
     val metadataRecords: HoodieData[HoodieRecord[HoodieMetadataPayload]] =
       metadataTable.getRecordsByKeyPrefixes(
-        HoodieListData.eager(encodedTargetColumnNames.asJava), 
HoodieTableMetadataUtil.PARTITION_NAME_PARTITION_STATS, shouldReadInMemory,
-        HoodieTableMetadataUtil.IDENTITY_ENCODING)
+        HoodieListData.eager(rawKeys.asJava), 
HoodieTableMetadataUtil.PARTITION_NAME_PARTITION_STATS, shouldReadInMemory)
     val columnStatsRecords: HoodieData[HoodieMetadataColumnStats] =
       //TODO: [HUDI-8303] Explicit conversion might not be required for Scala 
2.12+
       metadataRecords.map(JFunction.toJavaSerializableFunction(record => {
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala
index 9a5b2eefdbc2..c7fa20cf302d 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala
@@ -75,7 +75,7 @@ class RecordLevelIndexSupport(spark: SparkSession,
    * @return Sequence of file names which need to be queried
    */
   private def getCandidateFilesForRecordKeys(allFiles: Seq[StoragePath], 
recordKeys: List[String]): Set[String] = {
-    val recordIndexData = metadataTable.readRecordIndex(
+    val recordIndexData = metadataTable.readRecordIndexLocationsWithKeys(
         
HoodieListData.eager(JavaConverters.seqAsJavaListConverter(recordKeys).asJava))
     try {
       val recordKeyLocationsMap = 
HoodieDataUtils.dedupeAndCollectAsMap(recordIndexData)
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SecondaryIndexSupport.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SecondaryIndexSupport.scala
index 1bc7b4f91b75..0a56f7f247b5 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SecondaryIndexSupport.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SecondaryIndexSupport.scala
@@ -79,7 +79,7 @@ class SecondaryIndexSupport(spark: SparkSession,
    * @return Sequence of file names which need to be queried
    */
   private def getCandidateFilesFromSecondaryIndex(allFiles: Seq[StoragePath], 
secondaryKeys: List[String], secondaryIndexName: String): Set[String] = {
-    val secondaryIndexData = metadataTable.readSecondaryIndex(
+    val secondaryIndexData = metadataTable.readSecondaryIndexLocationsWithKeys(
         
HoodieListData.eager(JavaConverters.seqAsJavaListConverter(secondaryKeys).asJava),
 secondaryIndexName)
     try {
       val recordKeyLocationsMap = 
HoodieDataUtils.dedupeAndCollectAsMap(secondaryIndexData)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
index 85609d2bf253..c659697be80e 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
@@ -89,8 +89,6 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.common.util.collection.Triple;
-import org.apache.hudi.common.util.hash.ColumnIndexID;
-import org.apache.hudi.common.util.hash.PartitionIndexID;
 import org.apache.hudi.config.HoodieArchivalConfig;
 import org.apache.hudi.config.HoodieCleanConfig;
 import org.apache.hudi.config.HoodieClusteringConfig;
@@ -104,9 +102,11 @@ import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase;
 import org.apache.hudi.io.storage.HoodieIOFactory;
 import org.apache.hudi.metadata.FileSystemBackedTableMetadata;
+import org.apache.hudi.metadata.ColumnStatsIndexPrefixRawKey;
 import org.apache.hudi.metadata.HoodieBackedTableMetadata;
 import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
 import org.apache.hudi.metadata.HoodieMetadataMetrics;
+import org.apache.hudi.metadata.RawKey;
 import org.apache.hudi.metadata.HoodieMetadataPayload;
 import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.metadata.HoodieTableMetadataUtil;
@@ -194,7 +194,6 @@ import static 
org.apache.hudi.config.HoodieCompactionConfig.INLINE_COMPACT_NUM_D
 import static 
org.apache.hudi.io.storage.HoodieSparkIOFactory.getHoodieSparkIOFactory;
 import static 
org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP;
 import static 
org.apache.hudi.metadata.HoodieTableMetadata.getMetadataTableBasePath;
-import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.IDENTITY_ENCODING;
 import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.deleteMetadataTable;
 import static org.apache.hudi.metadata.MetadataPartitionType.BLOOM_FILTERS;
 import static org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS;
@@ -1192,7 +1191,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
     HoodieTableMetadata metadataReader = 
metaClient.getTableFormat().getMetadataFactory().create(
         context, storage, writeConfig.getMetadataConfig(), 
writeConfig.getBasePath());
     HoodiePairData<String, HoodieRecordGlobalLocation> recordIndexData1 = 
metadataReader
-        
.readRecordIndex(HoodieListData.eager(records1.stream().map(HoodieRecord::getRecordKey)
+        
.readRecordIndexLocationsWithKeys(HoodieListData.eager(records1.stream().map(HoodieRecord::getRecordKey)
         .collect(Collectors.toList())));
     Map<String, HoodieRecordGlobalLocation> result;
     try {
@@ -1203,7 +1202,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
     }
     
     HoodiePairData<String, HoodieRecordGlobalLocation> recordIndexData2 = 
metadataReader
-        
.readRecordIndex(HoodieListData.eager(records2.stream().map(HoodieRecord::getRecordKey).collect(Collectors.toList())));
+        
.readRecordIndexLocationsWithKeys(HoodieListData.eager(records2.stream().map(HoodieRecord::getRecordKey).collect(Collectors.toList())));
     try {
       result = HoodieDataUtils.dedupeAndCollectAsMap(recordIndexData2);
       assertEquals(records2.size(), result.size(), "RI should return entries 
in the commit.");
@@ -2012,19 +2011,20 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
 
       HoodieTableMetadata tableMetadata = metadata(client, storage);
       // prefix search for column (_hoodie_record_key)
-      ColumnIndexID columnIndexID = new 
ColumnIndexID(HoodieRecord.RECORD_KEY_METADATA_FIELD);
+      ColumnStatsIndexPrefixRawKey columnKey = new 
ColumnStatsIndexPrefixRawKey(HoodieRecord.RECORD_KEY_METADATA_FIELD);
       List<HoodieRecord<HoodieMetadataPayload>> result = 
tableMetadata.getRecordsByKeyPrefixes(
-          
HoodieListData.lazy(Collections.singletonList(columnIndexID.asBase64EncodedString())),
-          MetadataPartitionType.COLUMN_STATS.getPartitionPath(), true, 
IDENTITY_ENCODING).collectAsList();
+          HoodieListData.lazy(Collections.singletonList(columnKey)),
+          MetadataPartitionType.COLUMN_STATS.getPartitionPath(), 
true).collectAsList();
 
       // there are 3 partitions in total and 2 commits. total entries should 
be 6.
       assertEquals(result.size(), 6);
 
       // prefix search for col(_hoodie_record_key) and first partition. only 2 
files should be matched
-      PartitionIndexID partitionIndexID = new 
PartitionIndexID(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);
+      ColumnStatsIndexPrefixRawKey columnWithPartitionKey = new 
ColumnStatsIndexPrefixRawKey(HoodieRecord.RECORD_KEY_METADATA_FIELD,
+          HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);
       result = tableMetadata.getRecordsByKeyPrefixes(
-          
HoodieListData.lazy(Collections.singletonList(columnIndexID.asBase64EncodedString().concat(partitionIndexID.asBase64EncodedString()))),
-          MetadataPartitionType.COLUMN_STATS.getPartitionPath(), true, 
IDENTITY_ENCODING).collectAsList();
+          
HoodieListData.lazy(Collections.singletonList(columnWithPartitionKey)),
+          MetadataPartitionType.COLUMN_STATS.getPartitionPath(), 
true).collectAsList();
       // 1 partition and 2 commits. total entries should be 2.
       assertEquals(result.size(), 2);
       result.forEach(entry -> {
@@ -2040,10 +2040,11 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
       });
 
       // prefix search for column {commit time} and first partition
-      columnIndexID = new 
ColumnIndexID(HoodieRecord.COMMIT_TIME_METADATA_FIELD);
+      ColumnStatsIndexPrefixRawKey commitTimeKey = new 
ColumnStatsIndexPrefixRawKey(HoodieRecord.COMMIT_TIME_METADATA_FIELD,
+          HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);
       result = tableMetadata.getRecordsByKeyPrefixes(
-          
HoodieListData.lazy(Collections.singletonList(columnIndexID.asBase64EncodedString().concat(partitionIndexID.asBase64EncodedString()))),
-          MetadataPartitionType.COLUMN_STATS.getPartitionPath(), true, 
IDENTITY_ENCODING).collectAsList();
+          HoodieListData.lazy(Collections.singletonList(commitTimeKey)),
+          MetadataPartitionType.COLUMN_STATS.getPartitionPath(), 
true).collectAsList();
 
       // 1 partition and 2 commits. total entries should be 2.
       assertEquals(result.size(), 2);
@@ -3030,9 +3031,16 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
 
       // assert entry is not present for deleted partition in metadata table
       HoodieTableMetadata tableMetadata = metadata(client, storage);
+      // Create a simple RawKey implementation for the partition path
+      RawKey partitionKey = new RawKey() {
+        @Override
+        public String encode() {
+          return HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
+        }
+      };
       assertTrue(tableMetadata.getRecordsByKeyPrefixes(
-          
HoodieListData.lazy(Collections.singletonList(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)),
-          FILES.getPartitionPath(), false, IDENTITY_ENCODING).isEmpty());
+          HoodieListData.lazy(Collections.singletonList(partitionKey)),
+          FILES.getPartitionPath(), false).isEmpty());
       
assertTrue(tableMetadata.getAllPartitionPaths().contains(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH));
       
assertFalse(tableMetadata.getAllPartitionPaths().contains(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH));
       // above upsert would have triggered clean
@@ -3623,7 +3631,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
       HoodieTableMetadata metadataReader = 
metaClient.getTableFormat().getMetadataFactory().create(
           context, storage, writeConfig.getMetadataConfig(), 
writeConfig.getBasePath());
       Map<String, HoodieRecordGlobalLocation> result = 
HoodieDataUtils.dedupeAndCollectAsMap(
-          metadataReader.readRecordIndex(
+          metadataReader.readRecordIndexLocationsWithKeys(
               
HoodieListData.eager(allRecords.stream().map(HoodieRecord::getRecordKey).collect(Collectors.toList()))));
       assertEquals(allRecords.size(), result.size(), "RI should have mapping 
for all the records in firstCommit");
 
@@ -3639,7 +3647,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
       // RI should not return mappings for deleted records
       metadataReader = metaClient.getTableFormat().getMetadataFactory().create(
           context, storage, writeConfig.getMetadataConfig(), 
writeConfig.getBasePath());
-      result = 
HoodieDataUtils.dedupeAndCollectAsMap(metadataReader.readRecordIndex(
+      result = 
HoodieDataUtils.dedupeAndCollectAsMap(metadataReader.readRecordIndexLocationsWithKeys(
           
HoodieListData.eager(allRecords.stream().map(HoodieRecord::getRecordKey).collect(Collectors.toList()))));
       assertEquals(allRecords.size() - recordsToDelete.size(), result.size(), 
"RI should not have mapping for deleted records");
       result.keySet().forEach(mappingKey -> 
assertFalse(keysToDelete.contains(mappingKey), "RI should not have mapping for 
deleted records"));
@@ -3660,7 +3668,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
       // RI should not return mappings for deleted records
       metadataReader = 
metaClient.getTableFormat().getMetadataFactory().create(context, storage, 
writeConfig.getMetadataConfig(), writeConfig.getBasePath());
       Map<String, HoodieRecordGlobalLocation> result =
-          HoodieDataUtils.dedupeAndCollectAsMap(metadataReader.readRecordIndex(
+          
HoodieDataUtils.dedupeAndCollectAsMap(metadataReader.readRecordIndexLocationsWithKeys(
               
HoodieListData.eager(allRecords.stream().map(HoodieRecord::getRecordKey).collect(Collectors.toList()))));
       assertEquals(allRecords.size() - keysToDelete.size(), result.size(), "RI 
should not have mapping for deleted records");
 
@@ -3671,7 +3679,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
 
       // New mappings should have been created for re-inserted records and 
should map to the new commit time
       metadataReader = 
metaClient.getTableFormat().getMetadataFactory().create(context, storage, 
writeConfig.getMetadataConfig(), writeConfig.getBasePath());
-      result = 
HoodieDataUtils.dedupeAndCollectAsMap(metadataReader.readRecordIndex(
+      result = 
HoodieDataUtils.dedupeAndCollectAsMap(metadataReader.readRecordIndexLocationsWithKeys(
           
HoodieListData.eager(allRecords.stream().map(HoodieRecord::getRecordKey).collect(Collectors.toList()))));
       assertEquals(allRecords.size(), result.size(), "RI should have mappings 
for re-inserted records");
       for (String reInsertedKey : keysToDelete) {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
index 43729d13dcd8..3fd94ad5cb3a 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
@@ -127,7 +127,7 @@ class RecordLevelIndexTestBase extends 
HoodieStatsIndexTestBase {
     val readDf = spark.read.format("hudi").load(basePath)
     readDf.cache()
     val rowArr = readDf.collect()
-    val recordIndexMap = 
HoodieDataUtils.dedupeAndCollectAsMap(metadata.readRecordIndex(
+    val recordIndexMap = 
HoodieDataUtils.dedupeAndCollectAsMap(metadata.readRecordIndexLocationsWithKeys(
       
HoodieListData.eager(JavaConverters.seqAsJavaListConverter(rowArr.map(row => 
row.getAs("_hoodie_record_key").toString).toList).asJava)))
 
     assertTrue(rowArr.length > 0)
@@ -141,7 +141,7 @@ class RecordLevelIndexTestBase extends 
HoodieStatsIndexTestBase {
     }
 
     val deletedRows = deletedDf.collect()
-    val recordIndexMapForDeletedRows = 
HoodieDataUtils.dedupeAndCollectAsMap(metadata.readRecordIndex(
+    val recordIndexMapForDeletedRows = 
HoodieDataUtils.dedupeAndCollectAsMap(metadata.readRecordIndexLocationsWithKeys(
       
HoodieListData.eager(JavaConverters.seqAsJavaListConverter(deletedRows.map(row 
=> row.getAs("_row_key").toString).toList).asJava)))
 
     assertEquals(0, recordIndexMapForDeletedRows.size(), "deleted records 
should not present in RLI")
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataRecordIndex.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataRecordIndex.scala
index 4aeee163aba4..e1ece6d88ecb 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataRecordIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataRecordIndex.scala
@@ -186,7 +186,7 @@ class TestMetadataRecordIndex extends 
HoodieSparkClientTestBase {
     val metadata = metadataWriter(writeConfig).getTableMetadata
     val readDf = spark.read.format("hudi").load(basePath)
     val rowArr = readDf.collect()
-    val res = metadata.readRecordIndex(
+    val res = metadata.readRecordIndexLocationsWithKeys(
       HoodieListData.eager(rowArr.map(row => 
row.getAs("_hoodie_record_key").toString).toList.asJava));
     val recordIndexMap = HoodieDataUtils.dedupeAndCollectAsMap(res);
     res.unpersistWithDependencies()
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestSecondaryIndex.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestSecondaryIndex.scala
index 8cc34c7aa756..aedd18e0ead3 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestSecondaryIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestSecondaryIndex.scala
@@ -792,7 +792,8 @@ class TestSecondaryIndex extends HoodieSparkSqlTestBase {
   private def testSecondaryIndexWithNullableColumns(tableType: String): Unit = 
{
     withSparkSqlSessionConfig(
       HoodieWriteConfig.WRITE_TABLE_VERSION.key() -> "9",
-      "hoodie.embed.timeline.server" -> "false") {
+      "hoodie.embed.timeline.server" -> "false",
+      "hoodie.parquet.small.file.limit" -> "0") {
       withTempDir { tmp =>
         val tableName = generateTableName + s"_nullable_${tableType}"
         val basePath = s"${tmp.getCanonicalPath}/$tableName"
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/testHoodieBackedTableMetadataIndexLookup.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/testHoodieBackedTableMetadataIndexLookup.scala
index ac6641d81644..88a2c13436ed 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/testHoodieBackedTableMetadataIndexLookup.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/testHoodieBackedTableMetadataIndexLookup.scala
@@ -64,7 +64,7 @@ abstract class HoodieBackedTableMetadataIndexLookupTestBase 
extends HoodieSparkS
        |) using hudi
        | options (
        |  primaryKey ='id',
-       |  type = 'cow',
+       |  type = 'mor',
        |  preCombineField = 'ts',
        |  hoodie.metadata.enable = 'true',
        |  hoodie.metadata.record.index.enable = 'true',
@@ -123,6 +123,7 @@ abstract class HoodieBackedTableMetadataIndexLookupTestBase 
extends HoodieSparkS
     // Create shared temporary directory
     tmpDir = Utils.createTempDir()
 
+    spark.sql("set hoodie.parquet.small.file.limit=0")
     // Setup shared test data
     setupSharedTestData()
   }
@@ -170,6 +171,9 @@ abstract class HoodieBackedTableMetadataIndexLookupTestBase 
extends HoodieSparkS
     spark.sql(s"insert into $tableName" + " values('$a', 'sec$key', 40, 1001)")
     spark.sql(s"insert into $tableName" + " values('a$a', '$sec$', 50, 1002)")
     spark.sql(s"insert into $tableName" + " values('$$', '$$', 60, 1003)")
+    // generate some deleted records
+    spark.sql(s"insert into $tableName" + " values('$$3', '$$', 60, 1003)")
+    spark.sql(s"delete from $tableName" + " where id = '$$3'")
 
     val props = Map(
       "hoodie.insert.shuffle.parallelism" -> "4",
@@ -259,14 +263,14 @@ abstract class 
HoodieBackedTableMetadataIndexLookupTestBase extends HoodieSparkS
     cleanUpCachedRDDs()
 
     // Case 1: Empty input
-    val emptyResultRDD = 
hoodieBackedTableMetadata.readRecordIndex(HoodieListData.eager(List.empty[String].asJava))
+    val emptyResultRDD = 
hoodieBackedTableMetadata.readRecordIndexLocationsWithKeys(HoodieListData.eager(List.empty[String].asJava))
     val emptyResult = emptyResultRDD.collectAsList()
     assert(emptyResult.isEmpty, "Empty input should return empty result")
     emptyResultRDD.unpersistWithDependencies()
 
     // Case 2: All existing keys including those with $ characters
     val allKeys = HoodieListData.eager(List("a1", "a2", "a$", "$a", "a$a", 
"$$").asJava)
-    val allResultRDD = hoodieBackedTableMetadata.readRecordIndex(allKeys)
+    val allResultRDD = 
hoodieBackedTableMetadata.readRecordIndexLocationsWithKeys(allKeys)
     val allResult = allResultRDD.collectAsList().asScala
     allResultRDD.unpersistWithDependencies()
     // Validate keys including special characters
@@ -293,14 +297,14 @@ abstract class 
HoodieBackedTableMetadataIndexLookupTestBase extends HoodieSparkS
 
     // Case 3: Non-existing keys, some matches the prefix of the existing 
records.
     val nonExistKeys = HoodieListData.eager(List("", "a", "a100", "200", "$", 
"a$$", "$$a", "$a$").asJava)
-    val nonExistResultRDD = 
hoodieBackedTableMetadata.readRecordIndex(nonExistKeys)
+    val nonExistResultRDD = 
hoodieBackedTableMetadata.readRecordIndexLocationsWithKeys(nonExistKeys)
     val nonExistResult = nonExistResultRDD.collectAsList().asScala
     assert(nonExistResult.isEmpty, "Non-existing keys should return empty 
result")
     nonExistResultRDD.unpersistWithDependencies()
 
     // Case 4: Mix of existing and non-existing keys
     val mixedKeys = HoodieListData.eager(List("a1", "a100", "a2", 
"a200").asJava)
-    val mixedResultRDD = hoodieBackedTableMetadata.readRecordIndex(mixedKeys)
+    val mixedResultRDD = 
hoodieBackedTableMetadata.readRecordIndexLocationsWithKeys(mixedKeys)
     val mixedResult = mixedResultRDD.collectAsList().asScala
     val mixedResultKeys = mixedResult.map(_.getKey()).toSet
     assert(mixedResultKeys == Set("a1", "a2"), "Should only return existing 
keys")
@@ -308,7 +312,7 @@ abstract class HoodieBackedTableMetadataIndexLookupTestBase 
extends HoodieSparkS
 
     // Case 5: Duplicate keys including those with $ characters
     val dupKeys = HoodieListData.eager(List("a1", "a1", "a2", "a2", "a$", 
"a$", "$a", "a$a", "a$a", "$a", "$$", "$$").asJava)
-    val dupResultRDD = hoodieBackedTableMetadata.readRecordIndex(dupKeys)
+    val dupResultRDD = 
hoodieBackedTableMetadata.readRecordIndexLocationsWithKeys(dupKeys)
     val dupResult = dupResultRDD.collectAsList().asScala
     val dupResultKeys = dupResult.map(_.getKey()).toSet
     assert(dupResultKeys == Set("a1", "a2", "a$", "$a", "a$a", "$$"), "Should 
deduplicate keys including those with $")
@@ -318,7 +322,7 @@ abstract class HoodieBackedTableMetadataIndexLookupTestBase 
extends HoodieSparkS
     jsc = new JavaSparkContext(spark.sparkContext)
     context = new HoodieSparkEngineContext(jsc, new SQLContext(spark))
     val rddKeys = HoodieJavaRDD.of(List("a1", "a2", "a$").asJava, context, 2)
-    val rddResult = hoodieBackedTableMetadata.readRecordIndex(rddKeys)
+    val rddResult = 
hoodieBackedTableMetadata.readRecordIndexLocationsWithKeys(rddKeys)
     val rddResultKeys = rddResult.map(_.getKey()).collectAsList().asScala.toSet
     assert(rddResultKeys == Set("a1", "a2", "a$"), "Should deduplicate keys 
including those with $")
     rddResult.unpersistWithDependencies()
@@ -427,7 +431,7 @@ abstract class HoodieBackedTableMetadataIndexLookupTestBase 
extends HoodieSparkS
 
     // Test with existing secondary keys including those with $ characters
     val existingKeys = HoodieListData.eager(List("b1", "b2", "b$", "b$", "b1", 
"$$", "sec$key", "$sec$", "$$", null).asJava)
-    val result = 
hoodieBackedTableMetadata.getSecondaryIndexRecords(existingKeys, 
secondaryIndexName)
+    val result = 
hoodieBackedTableMetadata.readSecondaryIndexDataTableRecordKeysWithKeys(existingKeys,
 secondaryIndexName)
     val resultMap = HoodieDataUtils.collectPairDataAsMap(result)
 
     assert(resultMap.size == 6, s"Should return 6 results for existing 
secondary keys in table version ${getTableVersion}")
@@ -441,14 +445,14 @@ abstract class 
HoodieBackedTableMetadataIndexLookupTestBase extends HoodieSparkS
 
     // Test with non-existing secondary keys
     val nonExistingKeys = HoodieListData.eager(List("", "b", "$", " ", null, 
"non_exist_1", "non_exist_2").asJava)
-    val nonExistingResult = 
hoodieBackedTableMetadata.getSecondaryIndexRecords(nonExistingKeys, 
secondaryIndexName)
+    val nonExistingResult = 
hoodieBackedTableMetadata.readSecondaryIndexDataTableRecordKeysWithKeys(nonExistingKeys,
 secondaryIndexName)
     val nonExistingMap = 
HoodieDataUtils.collectPairDataAsMap(nonExistingResult)
     assert(nonExistingMap.isEmpty, s"Should return empty result for 
non-existing secondary keys in table version ${getTableVersion}")
     nonExistingResult.unpersistWithDependencies()
 
     // Test with a mixture of existing and non-existing secondary keys
     val rddKeys = HoodieJavaRDD.of(List("b1", "b2", "b$", null, "$").asJava, 
context, 2)
-    val rddResult = 
hoodieBackedTableMetadata.getSecondaryIndexRecords(rddKeys, secondaryIndexName)
+    val rddResult = 
hoodieBackedTableMetadata.readSecondaryIndexDataTableRecordKeysWithKeys(rddKeys,
 secondaryIndexName)
 
     // Collect and validate results
     val rddResultMap = HoodieDataUtils.collectPairDataAsMap(rddResult)
@@ -610,7 +614,7 @@ class HoodieBackedTableMetadataIndexLookupV9TestBase 
extends HoodieBackedTableMe
     }
 
     // Test getSecondaryIndexRecords API with null value
-    val nullRecordsResult = 
hoodieBackedTableMetadata.getSecondaryIndexRecords(nullKeys, secondaryIndexName)
+    val nullRecordsResult = 
hoodieBackedTableMetadata.readSecondaryIndexDataTableRecordKeysWithKeys(nullKeys,
 secondaryIndexName)
     val nullRecordsMap = 
HoodieDataUtils.collectPairDataAsMap(nullRecordsResult)
     nullRecordsResult.unpersistWithDependencies()
     // Verify that null key maps to record keys.
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
index b5b0fb072459..49327fa731d5 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
@@ -1139,7 +1139,7 @@ public class HoodieMetadataTableValidator implements 
Serializable {
     try {
       for (int i = 0; i < numPartitions; i++) {
         List<String> secKeys = secondaryKeys.collectPartitions(new int[] 
{i})[0];
-        HoodiePairData<String, String> secondaryIndexData = 
((HoodieBackedTableMetadata) 
metadataContext.tableMetadata).getSecondaryIndexRecords(
+        HoodiePairData<String, String> secondaryIndexData = 
((HoodieBackedTableMetadata) 
metadataContext.tableMetadata).readSecondaryIndexDataTableRecordKeysWithKeys(
             HoodieListData.lazy(secKeys), indexDefinition.getIndexName());
         try {
           Map<String, Set<String>> mdtSecondaryKeyToRecordKeys = 
HoodieDataUtils.collectPairDataAsMap(secondaryIndexData);


Reply via email to