This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new ff60d3213651 [HUDI-9548] Secondary index lookup should do escape -> 
hash -> sort for read write (#13521)
ff60d3213651 is described below

commit ff60d32136516bb8f652471db9d2373b00ded458
Author: Davis-Zhang-Onehouse 
<[email protected]>
AuthorDate: Wed Jul 23 18:26:12 2025 -0700

    [HUDI-9548] Secondary index lookup should do escape -> hash -> sort for 
read write (#13521)
---
 .../hudi/client/TestJavaHoodieBackedMetadata.java  |  7 +-
 .../client/utils/SparkMetadataWriterUtils.java     |  3 +-
 .../hudi/common/table/view/NoOpTableMetadata.java  |  2 +-
 .../apache/hudi/metadata/BaseTableMetadata.java    | 10 ++-
 .../metadata/FileSystemBackedTableMetadata.java    |  2 +-
 .../hudi/metadata/HoodieBackedTableMetadata.java   | 94 ++++++++++++++--------
 .../apache/hudi/metadata/HoodieTableMetadata.java  |  2 +-
 .../hudi/metadata/HoodieTableMetadataUtil.java     |  8 +-
 .../hudi/metadata/SecondaryIndexKeyUtils.java      |  8 +-
 .../hudi/metadata/TestHoodieTableMetadataUtil.java | 24 ++++--
 .../apache/hudi/source/stats/FileStatsIndex.java   |  4 +-
 .../org/apache/hudi/ColumnStatsIndexSupport.scala  |  2 +-
 .../org/apache/hudi/ExpressionIndexSupport.scala   |  4 +-
 .../apache/hudi/PartitionStatsIndexSupport.scala   |  2 +-
 .../hudi/functional/TestHoodieBackedMetadata.java  |  9 ++-
 15 files changed, 115 insertions(+), 66 deletions(-)

diff --git 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
index cd55d027d6f5..0c9e4df761da 100644
--- 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
+++ 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
@@ -164,6 +164,7 @@ import static 
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_FILE_NAME
 import static 
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
 import static 
org.apache.hudi.config.HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS;
 import static 
org.apache.hudi.metadata.HoodieTableMetadata.getMetadataTableBasePath;
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.IDENTITY_ENCODING;
 import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.deleteMetadataTable;
 import static org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS;
 import static org.apache.hudi.metadata.MetadataPartitionType.FILES;
@@ -1473,7 +1474,7 @@ public class TestJavaHoodieBackedMetadata extends 
TestHoodieMetadataBase {
       ColumnIndexID columnIndexID = new 
ColumnIndexID(HoodieRecord.RECORD_KEY_METADATA_FIELD);
       List<HoodieRecord<HoodieMetadataPayload>> result = 
tableMetadata.getRecordsByKeyPrefixes(
           
HoodieListData.lazy(Collections.singletonList(columnIndexID.asBase64EncodedString())),
-          MetadataPartitionType.COLUMN_STATS.getPartitionPath(), true, 
Option.empty()).collectAsList();
+          MetadataPartitionType.COLUMN_STATS.getPartitionPath(), true, 
IDENTITY_ENCODING).collectAsList();
 
       // there are 3 partitions in total and 2 commits. total entries should 
be 6.
       assertEquals(result.size(), 6);
@@ -1485,7 +1486,7 @@ public class TestJavaHoodieBackedMetadata extends 
TestHoodieMetadataBase {
       PartitionIndexID partitionIndexID = new 
PartitionIndexID(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);
       result = tableMetadata.getRecordsByKeyPrefixes(
           
HoodieListData.lazy(Collections.singletonList(columnIndexID.asBase64EncodedString().concat(partitionIndexID.asBase64EncodedString()))),
-          MetadataPartitionType.COLUMN_STATS.getPartitionPath(), true, 
Option.empty()).collectAsList();
+          MetadataPartitionType.COLUMN_STATS.getPartitionPath(), true, 
IDENTITY_ENCODING).collectAsList();
       // 1 partition and 2 commits. total entries should be 2.
       assertEquals(result.size(), 2);
       result.forEach(entry -> {
@@ -1505,7 +1506,7 @@ public class TestJavaHoodieBackedMetadata extends 
TestHoodieMetadataBase {
       columnIndexID = new 
ColumnIndexID(HoodieRecord.COMMIT_TIME_METADATA_FIELD);
       result = tableMetadata.getRecordsByKeyPrefixes(
           
HoodieListData.lazy(Collections.singletonList(columnIndexID.asBase64EncodedString().concat(partitionIndexID.asBase64EncodedString()))),
-          MetadataPartitionType.COLUMN_STATS.getPartitionPath(), true, 
Option.empty()).collectAsList();
+          MetadataPartitionType.COLUMN_STATS.getPartitionPath(), true, 
IDENTITY_ENCODING).collectAsList();
 
       // 1 partition and 2 commits. total entries should be 2.
       assertEquals(result.size(), 2);
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
index 3920ea2c50ae..cddb24c23981 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
@@ -108,6 +108,7 @@ import static 
org.apache.hudi.metadata.HoodieMetadataPayload.COLUMN_STATS_FIELD_
 import static 
org.apache.hudi.metadata.HoodieMetadataPayload.COLUMN_STATS_FIELD_VALUE_COUNT;
 import static 
org.apache.hudi.metadata.HoodieMetadataPayload.createBloomFilterMetadataRecord;
 import static 
org.apache.hudi.metadata.HoodieMetadataPayload.createColumnStatsRecords;
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.IDENTITY_ENCODING;
 import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_BLOOM_FILTERS;
 import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS;
 import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.getFileSystemViewForMetadataTable;
@@ -401,7 +402,7 @@ public class SparkMetadataWriterUtils {
         // Fetch EI column stat records for above files
         List<HoodieColumnRangeMetadata<Comparable>> partitionColumnMetadata =
             tableMetadata.getRecordsByKeyPrefixes(
-                    
HoodieListData.lazy(HoodieTableMetadataUtil.generateKeyPrefixes(validColumnsToIndex,
 partitionName)), indexPartition, false, Option.empty())
+                    
HoodieListData.lazy(HoodieTableMetadataUtil.generateKeyPrefixes(validColumnsToIndex,
 partitionName)), indexPartition, false, IDENTITY_ENCODING)
                 // schema and properties are ignored in getInsertValue, so 
simply pass as null
                 .map(record -> record.getData().getInsertValue(null, null))
                 .filter(Option::isPresent)
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/NoOpTableMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/NoOpTableMetadata.java
index 6f0b9d0695c3..6c7cc33f934f 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/NoOpTableMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/NoOpTableMetadata.java
@@ -118,7 +118,7 @@ class NoOpTableMetadata implements HoodieTableMetadata {
   public HoodieData<HoodieRecord<HoodieMetadataPayload>> 
getRecordsByKeyPrefixes(HoodieData<String> keyPrefixes,
                                                                                
  String partitionName,
                                                                                
  boolean shouldLoadInMemory,
-                                                                               
  Option<SerializableFunctionUnchecked<String, String>> keyEncoder) {
+                                                                               
  SerializableFunctionUnchecked<String, String> keyEncoder) {
     throw new HoodieMetadataException("Unsupported operation: 
getRecordsByKeyPrefixes!");
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java 
b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
index 0b071b5bfd5c..1519e38de244 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
@@ -66,6 +66,8 @@ import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.IDENTITY_ENCODING;
+
 /**
  * Abstract class for implementing common table metadata operations.
  */
@@ -210,7 +212,7 @@ public abstract class BaseTableMetadata extends 
AbstractHoodieTableMetadata {
     List<String> partitionIDFileIDStringsList = new 
ArrayList<>(partitionIDFileIDStrings);
     Map<String, HoodieRecord<HoodieMetadataPayload>> hoodieRecords =
         HoodieDataUtils.dedupeAndCollectAsMap(
-            
getRecordsByKeys(HoodieListData.eager(partitionIDFileIDStringsList), 
metadataPartitionName, Option.empty()));
+            
getRecordsByKeys(HoodieListData.eager(partitionIDFileIDStringsList), 
metadataPartitionName, IDENTITY_ENCODING));
     metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.LOOKUP_BLOOM_FILTERS_METADATA_STR, 
timer.endTimer()));
     metrics.ifPresent(m -> 
m.setMetric(HoodieMetadataMetrics.LOOKUP_BLOOM_FILTERS_FILE_COUNT_STR, 
partitionIDFileIDStringsList.size()));
 
@@ -330,7 +332,7 @@ public abstract class BaseTableMetadata extends 
AbstractHoodieTableMetadata {
     Map<String, HoodieRecord<HoodieMetadataPayload>> partitionIdRecordPairs =
         HoodieDataUtils.dedupeAndCollectAsMap(
             getRecordsByKeys(HoodieListData.eager(new 
ArrayList<>(partitionIdToPathMap.keySet())),
-                MetadataPartitionType.FILES.getPartitionPath(), 
Option.empty()));
+                MetadataPartitionType.FILES.getPartitionPath(), 
IDENTITY_ENCODING));
     metrics.ifPresent(
         m -> m.updateMetrics(HoodieMetadataMetrics.LOOKUP_FILES_STR, 
timer.endTimer()));
 
@@ -387,7 +389,7 @@ public abstract class BaseTableMetadata extends 
AbstractHoodieTableMetadata {
     Map<String, HoodieRecord<HoodieMetadataPayload>> hoodieRecords =
         HoodieDataUtils.dedupeAndCollectAsMap(
             getRecordsByKeys(
-                HoodieListData.eager(columnStatKeylist), 
MetadataPartitionType.COLUMN_STATS.getPartitionPath(), Option.empty()));
+                HoodieListData.eager(columnStatKeylist), 
MetadataPartitionType.COLUMN_STATS.getPartitionPath(), IDENTITY_ENCODING));
     metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.LOOKUP_COLUMN_STATS_METADATA_STR, 
timer.endTimer()));
     Map<Pair<String, String>, List<HoodieMetadataColumnStats>> 
fileToColumnStatMap = new HashMap<>();
     for (final Map.Entry<String, HoodieRecord<HoodieMetadataPayload>> entry : 
hoodieRecords.entrySet()) {
@@ -436,7 +438,7 @@ public abstract class BaseTableMetadata extends 
AbstractHoodieTableMetadata {
    * @return A collection of pairs (key -> record)
    */
   public abstract HoodiePairData<String, HoodieRecord<HoodieMetadataPayload>> 
getRecordsByKeys(
-          HoodieData<String> keys, String partitionName, 
Option<SerializableFunctionUnchecked<String, String>> keyEncodingFn);
+          HoodieData<String> keys, String partitionName, 
SerializableFunctionUnchecked<String, String> keyEncodingFn);
 
   /**
    * Returns a collection of pairs (secondary-key -> set-of-record-keys) for 
the provided secondary keys.
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
index 06097655e7ca..6b01180696af 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
@@ -314,7 +314,7 @@ public class FileSystemBackedTableMetadata extends 
AbstractHoodieTableMetadata {
   public HoodieData<HoodieRecord<HoodieMetadataPayload>> 
getRecordsByKeyPrefixes(HoodieData<String> keyPrefixes,
                                                                                
  String partitionName,
                                                                                
  boolean shouldLoadInMemory,
-                                                                               
  Option<SerializableFunctionUnchecked<String, String>> keyEncoder) {
+                                                                               
  SerializableFunctionUnchecked<String, String> keyEncoder) {
     throw new HoodieMetadataException("Unsupported operation: 
getRecordsByKeyPrefixes!");
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index 79964298cf1b..8d199f3ecff5 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -95,12 +95,12 @@ import static 
org.apache.hudi.common.config.HoodieMemoryConfig.SPILLABLE_MAP_BAS
 import static 
org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FULL_SCAN_LOG_FILES;
 import static org.apache.hudi.common.util.ValidationUtils.checkState;
 import static org.apache.hudi.metadata.HoodieMetadataPayload.KEY_FIELD_NAME;
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.IDENTITY_ENCODING;
 import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_BLOOM_FILTERS;
 import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS;
 import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_FILES;
 import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.existingIndexVersionOrDefault;
 import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.getFileSystemViewForMetadataTable;
-import static 
org.apache.hudi.metadata.SecondaryIndexKeyUtils.unescapeSpecialChars;
 
 /**
  * Table metadata provided by an internal DFS backed Hudi metadata table.
@@ -168,7 +168,7 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
   @Override
   protected Option<HoodieRecord<HoodieMetadataPayload>> getRecordByKey(String 
key, String partitionName) {
     List<HoodieRecord<HoodieMetadataPayload>> records = getRecordsByKeys(
-        HoodieListData.eager(Collections.singletonList(key)), partitionName, 
Option.empty())
+        HoodieListData.eager(Collections.singletonList(key)), partitionName, 
IDENTITY_ENCODING)
         .values().collectAsList();
     ValidationUtils.checkArgument(records.size() <= 1, "Found more than 1 
record for record key " + key);
     return records.isEmpty() ? Option.empty() : 
Option.ofNullable(records.get(0));
@@ -213,12 +213,10 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
   public HoodieData<HoodieRecord<HoodieMetadataPayload>> 
getRecordsByKeyPrefixes(HoodieData<String> keyPrefixes,
                                                                                
  String partitionName,
                                                                                
  boolean shouldLoadInMemory,
-                                                                               
  Option<SerializableFunctionUnchecked<String, String>> keyEncodingFn) {
+                                                                               
  SerializableFunctionUnchecked<String, String> keyEncodingFn) {
     ValidationUtils.checkState(keyPrefixes instanceof HoodieListData, 
"getRecordsByKeyPrefixes only support HoodieListData at the moment");
-    // Apply key encoding if present
-    List<String> sortedKeyPrefixes = keyEncodingFn.isPresent()
-        ? new ArrayList<>(keyPrefixes.map(k -> 
keyEncodingFn.get().apply(k)).collectAsList())
-        : new ArrayList<>(keyPrefixes.collectAsList());
+    // Apply key encoding
+    List<String> sortedKeyPrefixes = new 
ArrayList<>(keyPrefixes.map(keyEncodingFn::apply).collectAsList());
     // Sort the prefixes so that keys are looked up in order
     // Sort must come after encoding.
     Collections.sort(sortedKeyPrefixes);
@@ -263,12 +261,29 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
     return Predicates.startsWithAny(null, right);
   }
 
+  /**
+   * All keys to be looked up go through the following steps:
+   * 1. [encode] escape/encode the key if needed
+   * 2. [hash to file group] compute the hash of the key to
+   * 3. [lookup within file groups] lookup the key in the file group
+   * 4. the record is returned
+   */
+  /**
+   * Performs lookup of records in the metadata table.
+   *
+   * @param keys               The keys to look up in the metadata table
+   * @param partitionName      The name of the metadata partition to search in
+   * @param fileSlices         The list of file slices to search through
+   * @param isSecondaryIndex   Whether this lookup is for a secondary index
+   * @param keyEncodingFn      Optional function to encode keys before lookup
+   * @return Pair data containing the looked up records keyed by their 
original keys
+   */
   private HoodiePairData<String, HoodieRecord<HoodieMetadataPayload>> 
doLookup(HoodieData<String> keys, String partitionName, List<FileSlice> 
fileSlices,
-                                                                               
boolean isSecondaryIndex, Option<SerializableFunctionUnchecked<String, String>> 
keyEncodingFn) {
+                                                                               
boolean isSecondaryIndex, SerializableFunctionUnchecked<String, String> 
keyEncodingFn) {
 
     final int numFileSlices = fileSlices.size();
     if (numFileSlices == 1) {
-      List<String> keysList = keyEncodingFn.isPresent() ? keys.map(k -> 
keyEncodingFn.get().apply(k)).collectAsList() : keys.collectAsList();
+      List<String> keysList = keys.map(keyEncodingFn::apply).collectAsList();
       TreeSet<String> distinctSortedKeys = new TreeSet<>(keysList);
       return lookupKeyRecordPairs(partitionName, new 
ArrayList<>(distinctSortedKeys), fileSlices.get(0), !isSecondaryIndex);
     }
@@ -277,12 +292,14 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
     // SI write path concatenates secKey$recordKey, the secKey needs extracted 
for hashing;
     // SI read path gives secKey only, no need for secKey extraction.
     SerializableBiFunction<String, Integer, Integer> mappingFunction = 
HoodieTableMetadataUtil::mapRecordKeyToFileGroupIndex;
-    keys = repartitioningIfNeeded(keys, partitionName, numFileSlices, 
mappingFunction);
+    keys = repartitioningIfNeeded(keys, partitionName, numFileSlices, 
mappingFunction, keyEncodingFn);
     HoodiePairData<Integer, String> persistedInitialPairData = keys
-        // Tag key with file group index and apply key encoding
-        .mapToPair(recordKey -> new ImmutablePair<>(
-            mappingFunction.apply(recordKey, numFileSlices),
-            keyEncodingFn.isPresent() ? keyEncodingFn.get().apply(recordKey) : 
recordKey));
+        // Tag key with file group index
+        .mapToPair(recordKey -> {
+          String encodedKey = keyEncodingFn.apply(recordKey);
+          // Always encode the key before apply mapping.
+          return new ImmutablePair<>(mappingFunction.apply(encodedKey, 
numFileSlices), encodedKey);
+        });
     persistedInitialPairData.persist("MEMORY_AND_DISK_SER");
     // Use the new processValuesOfTheSameShards API instead of explicit 
rangeBasedRepartitionForEachKey
     SerializableFunction<Iterator<String>, Iterator<Pair<String, 
HoodieRecord<HoodieMetadataPayload>>>> processFunction =
@@ -295,7 +312,7 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
             }
             distinctSortedKeyIter.forEachRemaining(keysList::add);
           }
-          FileSlice fileSlice = 
fileSlices.get(mappingFunction.apply(unescapeSpecialChars(keysList.get(0)), 
numFileSlices));
+          FileSlice fileSlice = 
fileSlices.get(mappingFunction.apply(keysList.get(0), numFileSlices));
           return lookupKeyRecordPairsItr(partitionName, keysList, fileSlice, 
!isSecondaryIndex);
         };
 
@@ -307,12 +324,19 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
     return result.filter((String k, HoodieRecord<HoodieMetadataPayload> v) -> 
!v.getData().isDeleted());
   }
 
+  /**
+   * All keys to be looked up go through the following steps:
+   * 1. [encode] escape/encode the key if needed
+   * 2. [hash to file group] compute the hash of the key to
+   * 3. [lookup within file groups] lookup the key in the file group
+   * 4. the record is returned
+   */
   private HoodieData<HoodieRecord<HoodieMetadataPayload>> 
doLookupIndexRecords(HoodieData<String> keys, String partitionName, 
List<FileSlice> fileSlices,
-                                                                               
boolean isSecondaryIndex, Option<SerializableFunctionUnchecked<String, String>> 
keyEncodingFn) {
+                                                                               
boolean isSecondaryIndex, SerializableFunctionUnchecked<String, String> 
keyEncodingFn) {
 
     final int numFileSlices = fileSlices.size();
     if (numFileSlices == 1) {
-      List<String> keysList = keyEncodingFn.isPresent() ? keys.map(k -> 
keyEncodingFn.get().apply(k)).collectAsList() : keys.collectAsList();
+      List<String> keysList = keys.map(keyEncodingFn::apply).collectAsList();
       TreeSet<String> distinctSortedKeys = new TreeSet<>(keysList);
       return lookupRecords(partitionName, new ArrayList<>(distinctSortedKeys), 
fileSlices.get(0), !isSecondaryIndex);
     }
@@ -321,12 +345,14 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
     // SI write path concatenates secKey$recordKey, the secKey needs extracted 
for hashing;
     // SI read path gives secKey only, no need for secKey extraction.
     SerializableBiFunction<String, Integer, Integer> mappingFunction = 
HoodieTableMetadataUtil::mapRecordKeyToFileGroupIndex;
-    keys = repartitioningIfNeeded(keys, partitionName, numFileSlices, 
mappingFunction);
+    keys = repartitioningIfNeeded(keys, partitionName, numFileSlices, 
mappingFunction, keyEncodingFn);
     HoodiePairData<Integer, String> persistedInitialPairData = keys
         // Tag key with file group index
-        .mapToPair(recordKey -> new ImmutablePair<>(
-            mappingFunction.apply(recordKey, numFileSlices),
-            keyEncodingFn.isPresent() ? keyEncodingFn.get().apply(recordKey) : 
recordKey));
+        .mapToPair(recordKey -> {
+          String encodedKey = keyEncodingFn.apply(recordKey);
+          // Always encode the key before apply mapping.
+          return new ImmutablePair<>(mappingFunction.apply(encodedKey, 
numFileSlices), encodedKey);
+        });
     persistedInitialPairData.persist("MEMORY_AND_DISK_SER");
 
     // Use the new processValuesOfTheSameShards API instead of explicit 
rangeBasedRepartitionForEachKey
@@ -340,7 +366,7 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
             }
             distinctSortedKeyIter.forEachRemaining(keysList::add);
           }
-          FileSlice fileSlice = 
fileSlices.get(mappingFunction.apply(unescapeSpecialChars(keysList.get(0)), 
numFileSlices));
+          FileSlice fileSlice = 
fileSlices.get(mappingFunction.apply(keysList.get(0), numFileSlices));
           return lookupRecordsItr(partitionName, keysList, fileSlice, 
!isSecondaryIndex);
         };
     List<Integer> keySpace = IntStream.range(0, 
numFileSlices).boxed().collect(Collectors.toList());
@@ -367,7 +393,7 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
         "Record index is not initialized in MDT");
 
     // TODO [HUDI-9544]: Metric does not work for rdd based API due to lazy 
evaluation.
-    return getRecordsByKeys(recordKeys, 
MetadataPartitionType.RECORD_INDEX.getPartitionPath(), Option.empty())
+    return getRecordsByKeys(recordKeys, 
MetadataPartitionType.RECORD_INDEX.getPartitionPath(), IDENTITY_ENCODING)
         .mapToPair((Pair<String, HoodieRecord<HoodieMetadataPayload>> p) -> 
Pair.of(p.getLeft(), p.getRight().getData().getRecordGlobalLocation()));
   }
 
@@ -386,7 +412,7 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
     // The caller is required to check for record index existence in MDT 
before calling this method.
     
ValidationUtils.checkState(dataMetaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.RECORD_INDEX),
         "Record index is not initialized in MDT");
-    return readIndexRecords(recordKeys, 
MetadataPartitionType.RECORD_INDEX.getPartitionPath(), Option.empty())
+    return readIndexRecords(recordKeys, 
MetadataPartitionType.RECORD_INDEX.getPartitionPath(), IDENTITY_ENCODING)
         .map(r -> r.getData().getRecordGlobalLocation());
   }
 
@@ -432,7 +458,7 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
 
   @Override
   public HoodiePairData<String, HoodieRecord<HoodieMetadataPayload>> 
getRecordsByKeys(
-      HoodieData<String> keys, String partitionName, 
Option<SerializableFunctionUnchecked<String, String>> keyEncodingFn) {
+      HoodieData<String> keys, String partitionName, 
SerializableFunctionUnchecked<String, String> keyEncodingFn) {
     List<FileSlice> fileSlices = 
partitionFileSliceMap.computeIfAbsent(partitionName,
         k -> 
HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient, 
getMetadataFileSystemView(), partitionName));
     checkState(!fileSlices.isEmpty(), "No file slices found for partition: " + 
partitionName);
@@ -442,7 +468,7 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
   }
 
   public HoodieData<String> 
getRecordKeysFromSecondaryKeysV2(HoodieData<String> secondaryKeys, String 
partitionName) {
-    return readIndexRecords(secondaryKeys, partitionName, 
Option.of(SecondaryIndexKeyUtils::escapeSpecialChars)).map(
+    return readIndexRecords(secondaryKeys, partitionName, 
SecondaryIndexKeyUtils::escapeSpecialChars).map(
         hoodieRecord -> 
SecondaryIndexKeyUtils.getRecordKeyFromSecondaryIndexKey(hoodieRecord.getRecordKey()));
   }
 
@@ -469,7 +495,7 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
 
   protected HoodieData<HoodieRecord<HoodieMetadataPayload>> 
readIndexRecords(HoodieData<String> keys,
                                                                              
String partitionName,
-                                                                             
Option<SerializableFunctionUnchecked<String, String>> keyEncodingFn) {
+                                                                             
SerializableFunctionUnchecked<String, String> keyEncodingFn) {
     List<FileSlice> fileSlices = 
partitionFileSliceMap.computeIfAbsent(partitionName,
         k -> 
HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient, 
getMetadataFileSystemView(), partitionName));
     checkState(!fileSlices.isEmpty(), "No file slices found for partition: " + 
partitionName);
@@ -481,16 +507,18 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
   // When testing we noticed that the parallelism can be very low which hurts 
the performance. so we should start with a reasonable
   // level of parallelism in that case.
   private HoodieData<String> repartitioningIfNeeded(
-      HoodieData<String> keys, String partitionName, int numFileSlices, 
SerializableBiFunction<String, Integer, Integer> mappingFunction) {
+      HoodieData<String> keys, String partitionName, int numFileSlices, 
SerializableBiFunction<String, Integer, Integer> mappingFunction,
+      SerializableFunctionUnchecked<String, String> keyEncodingFn) {
     if (keys instanceof HoodieListData) {
-      int parallelism = (int) keys.map(k -> mappingFunction.apply(k, 
numFileSlices)).distinct().count();
+      int parallelism;
+      parallelism = (int) keys.map(k -> 
mappingFunction.apply(keyEncodingFn.apply(k), 
numFileSlices)).distinct().count();
       // In case of empty lookup set, we should avoid RDD with 0 partitions.
       parallelism = Math.max(parallelism, 1);
-      LOG.info("getRecordFast repartition HoodieListData to JavaRDD: exit, 
partitionName {}, num partitions: {}",
+      LOG.info("Repartitioning keys for partition {} from list data with 
parallelism: {}",
           partitionName, parallelism);
       keys = getEngineContext().parallelize(keys.collectAsList(), parallelism);
     } else if (keys.getNumPartitions() < 
metadataConfig.getRepartitionMinPartitionsThreshold()) {
-      LOG.info("getRecordFast repartition HoodieNonListData. partitionName {}, 
num partitions: {}", partitionName, 
metadataConfig.getRepartitionDefaultPartitions());
+      LOG.info("Repartitioning keys for partition {} to {} partitions", 
partitionName, metadataConfig.getRepartitionDefaultPartitions());
       keys = 
keys.repartition(metadataConfig.getRepartitionDefaultPartitions());
     }
     return keys;
@@ -732,7 +760,7 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
       return HoodieListPairData.eager(Collections.emptyList());
     }
 
-    Map<String, Set<String>> res = getRecordsByKeyPrefixes(keys, 
partitionName, false, Option.of(SecondaryIndexKeyUtils::escapeSpecialChars))
+    Map<String, Set<String>> res = getRecordsByKeyPrefixes(keys, 
partitionName, false, SecondaryIndexKeyUtils::escapeSpecialChars)
             .map(record -> {
               if (!record.getData().isDeleted()) {
                 return 
SecondaryIndexKeyUtils.getSecondaryKeyRecordKeyPair(record.getRecordKey());
@@ -761,7 +789,7 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
     if (secondaryKeys.isEmpty()) {
       return HoodieListPairData.eager(Collections.emptyList());
     }
-    return readIndexRecords(secondaryKeys, partitionName, 
Option.of(SecondaryIndexKeyUtils::escapeSpecialChars))
+    return readIndexRecords(secondaryKeys, partitionName, 
SecondaryIndexKeyUtils::escapeSpecialChars)
         .filter(hoodieRecord -> !hoodieRecord.getData().isDeleted())
         .mapToPair(hoodieRecord -> 
SecondaryIndexKeyUtils.getRecordKeySecondaryKeyPair(hoodieRecord.getRecordKey()))
         .groupByKey()
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
index 1e6e5c466319..8035d31af5c2 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
@@ -265,7 +265,7 @@ public interface HoodieTableMetadata extends Serializable, 
AutoCloseable {
   HoodieData<HoodieRecord<HoodieMetadataPayload>> 
getRecordsByKeyPrefixes(HoodieData<String> keyPrefixes,
                                                                           
String partitionName,
                                                                           
boolean shouldLoadInMemory,
-                                                                          
Option<SerializableFunctionUnchecked<String, String>> keyEncoder);
+                                                                          
SerializableFunctionUnchecked<String, String> keyEncoder);
 
   /**
    * Get the instant time to which the metadata is synced w.r.t data timeline.
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 568c54975342..535dedf148b1 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -55,6 +55,7 @@ import org.apache.hudi.common.engine.HoodieReaderContext;
 import org.apache.hudi.common.engine.ReaderContextFactory;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.function.SerializableBiFunction;
+import org.apache.hudi.common.function.SerializableFunctionUnchecked;
 import org.apache.hudi.common.function.SerializablePairFunction;
 import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
 import org.apache.hudi.common.model.FileSlice;
@@ -210,6 +211,7 @@ public class HoodieTableMetadataUtil {
   public static final String PARTITION_NAME_EXPRESSION_INDEX_PREFIX = 
"expr_index_";
   public static final String PARTITION_NAME_SECONDARY_INDEX = 
"secondary_index";
   public static final String PARTITION_NAME_SECONDARY_INDEX_PREFIX = 
"secondary_index_";
+  public static final SerializableFunctionUnchecked<String, String> 
IDENTITY_ENCODING = key -> key;
 
   private static final Set<Schema.Type> SUPPORTED_TYPES_PARTITION_STATS = new 
HashSet<>(Arrays.asList(
       Schema.Type.INT, Schema.Type.LONG, Schema.Type.FLOAT, 
Schema.Type.DOUBLE, Schema.Type.STRING, Schema.Type.BOOLEAN, Schema.Type.NULL, 
Schema.Type.BYTES));
@@ -1343,7 +1345,7 @@ public class HoodieTableMetadataUtil {
    * (either all containing the secondary index separator or none containing 
it).
    * <p>
    * For secondary index partitions (version >= 2), if the record keys contain 
the secondary index separator,
-   * the secondary key portion is used for hashing. Otherwise, the full record 
key is used.
+   * the unescaped secondary key portion is used for hashing. Otherwise, the 
full record key is used.
    *
    * @param needsSecondaryKeyExtraction Whether to extract secondary key from 
composite keys (should be determined by caller)
    *
@@ -1352,7 +1354,7 @@ public class HoodieTableMetadataUtil {
   public static SerializableBiFunction<String, Integer, Integer> 
getSecondaryKeyToFileGroupMappingFunction(boolean needsSecondaryKeyExtraction) {
     if (needsSecondaryKeyExtraction) {
       return (recordKey, numFileGroups) -> {
-        String secondaryKey = 
SecondaryIndexKeyUtils.getSecondaryKeyFromSecondaryIndexKey(recordKey);
+        String secondaryKey = 
SecondaryIndexKeyUtils.getUnescapedSecondaryKeyFromSecondaryIndexKey(recordKey);
         return mapRecordKeyToFileGroupIndex(secondaryKey, numFileGroups);
       };
     }
@@ -2716,7 +2718,7 @@ public class HoodieTableMetadataUtil {
           // Fetch metadata table COLUMN_STATS partition records for above 
files
           List<HoodieColumnRangeMetadata<Comparable>> partitionColumnMetadata 
= tableMetadata
               .getRecordsByKeyPrefixes(
-                  HoodieListData.lazy(generateKeyPrefixes(colsToIndex, 
partitionName)), MetadataPartitionType.COLUMN_STATS.getPartitionPath(), false, 
Option.empty())
+                  HoodieListData.lazy(generateKeyPrefixes(colsToIndex, 
partitionName)), MetadataPartitionType.COLUMN_STATS.getPartitionPath(), false, 
IDENTITY_ENCODING)
               // schema and properties are ignored in getInsertValue, so 
simply pass as null
               .map(record -> 
((HoodieMetadataPayload)record.getData()).getColumnStatMetadata())
               .filter(Option::isPresent)
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexKeyUtils.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexKeyUtils.java
index 19a3bded29b1..e6c759b5fde1 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexKeyUtils.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexKeyUtils.java
@@ -53,10 +53,16 @@ public class SecondaryIndexKeyUtils {
   }
 
   public static String getSecondaryKeyFromSecondaryIndexKey(String key) {
+    // the payload key is in the format of "secondaryKey$primaryKey"
+    // we need to extract the secondary key from the payload key
+    return 
unescapeSpecialChars(getUnescapedSecondaryKeyFromSecondaryIndexKey(key));
+  }
+
+  public static String getUnescapedSecondaryKeyFromSecondaryIndexKey(String 
key) {
     // the payload key is in the format of "secondaryKey$primaryKey"
     // we need to extract the secondary key from the payload key
     int delimiterIndex = getSecondaryIndexKeySeparatorPosition(key);
-    return unescapeSpecialChars(key.substring(0, delimiterIndex));
+    return key.substring(0, delimiterIndex);
   }
 
   public static String constructSecondaryIndexKey(String secondaryKey, String 
recordKey) {
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
 
b/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
index e7f50dbb820e..5900fdafbdde 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
@@ -22,31 +22,39 @@ import 
org.apache.hudi.common.function.SerializableBiFunction;
 
 import org.junit.jupiter.api.Test;
 
+import static 
org.apache.hudi.metadata.SecondaryIndexKeyUtils.constructSecondaryIndexKey;
+import static 
org.apache.hudi.metadata.SecondaryIndexKeyUtils.escapeSpecialChars;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 
 public class TestHoodieTableMetadataUtil {
 
   @Test
-  public void testGetRecordKeyToFileGroupIndexFunctionOptimized() {
+  public void testGetRecordKeyToFileGroupIndexFunction() {
     // Test with secondary key format
     String compositeKey = "secondaryKey$recordKey";
 
-    SerializableBiFunction<String, Integer, Integer> optimizedFunction =
+    SerializableBiFunction<String, Integer, Integer> hashOnSecKeyOnly =
         
HoodieTableMetadataUtil.getSecondaryKeyToFileGroupMappingFunction(true);
+    SerializableBiFunction<String, Integer, Integer> hashOnFullKey =
+        
HoodieTableMetadataUtil.getSecondaryKeyToFileGroupMappingFunction(false);
 
-    int result1 = optimizedFunction.apply(compositeKey, 10);
-    int result2 = 
optimizedFunction.apply("anotherSecondaryKey$anotherRecordKey", 10);
+    int result1 = hashOnSecKeyOnly.apply(compositeKey, 10);
+    int result2 = 
hashOnSecKeyOnly.apply("anotherSecondaryKey$anotherRecordKey", 10);
 
     // Both should hash the secondary key portion
     assertNotEquals(result1, result2);
 
     // Test with regular key format
-    optimizedFunction = 
HoodieTableMetadataUtil.getSecondaryKeyToFileGroupMappingFunction(false);
-
-    int result3 = optimizedFunction.apply("simpleKey", 10);
-    int result4 = optimizedFunction.apply("anotherSimpleKey", 10);
+    int result3 = hashOnFullKey.apply("simpleKey", 10);
+    int result4 = hashOnFullKey.apply("anotherSimpleKey", 10);
 
     // Both should hash the full key
     assertNotEquals(result3, result4);
+
+    // Hash on Sec key only <=> Hash full key if key equals to UNESCAPED sec 
key
+    assertEquals(hashOnSecKeyOnly.apply("secKey$recKey", 10), 
hashOnFullKey.apply("secKey", 10));
+    assertEquals(hashOnSecKeyOnly.apply(constructSecondaryIndexKey("seckey", 
"reckey"), 10), hashOnFullKey.apply("seckey", 10));
+    assertEquals(hashOnSecKeyOnly.apply(constructSecondaryIndexKey("$", 
"reckey"), 10), hashOnFullKey.apply(escapeSpecialChars("$"), 10));
   }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/FileStatsIndex.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/FileStatsIndex.java
index 073491639cfc..0c1c296feab4 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/FileStatsIndex.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/FileStatsIndex.java
@@ -24,7 +24,6 @@ import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.data.HoodieListData;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.VisibleForTesting;
 import org.apache.hudi.common.util.collection.Pair;
@@ -64,6 +63,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
 import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.IDENTITY_ENCODING;
 import static 
org.apache.hudi.source.stats.ColumnStatsSchemas.COL_STATS_DATA_TYPE;
 import static 
org.apache.hudi.source.stats.ColumnStatsSchemas.COL_STATS_TARGET_POS;
 import static 
org.apache.hudi.source.stats.ColumnStatsSchemas.METADATA_DATA_TYPE;
@@ -397,7 +397,7 @@ public class FileStatsIndex implements ColumnStatsIndex {
 
     HoodieData<HoodieRecord<HoodieMetadataPayload>> records =
         getMetadataTable().getRecordsByKeyPrefixes(
-            HoodieListData.lazy(encodedTargetColumnNames), 
getIndexPartitionName(), false, Option.empty());
+            HoodieListData.lazy(encodedTargetColumnNames), 
getIndexPartitionName(), false, IDENTITY_ENCODING);
 
     org.apache.hudi.util.AvroToRowDataConverters.AvroToRowDataConverter 
converter =
         AvroToRowDataConverters.createRowConverter((RowType) 
METADATA_DATA_TYPE.getLogicalType());
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
index 60d4df93c18f..204a50ca2a03 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
@@ -358,7 +358,7 @@ class ColumnStatsIndexSupport(spark: SparkSession,
     val metadataRecords: HoodieData[HoodieRecord[HoodieMetadataPayload]] =
       metadataTable.getRecordsByKeyPrefixes(
         HoodieListData.eager(keyPrefixes.toSeq.asJava), 
HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS, shouldReadInMemory,
-        toJavaOption(Option.empty[SerializableFunctionUnchecked[String, 
String]]))
+        HoodieTableMetadataUtil.IDENTITY_ENCODING)
 
     val columnStatsRecords: HoodieData[HoodieMetadataColumnStats] =
       //TODO: [HUDI-8303] Explicit conversion might not be required for Scala 
2.12+
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ExpressionIndexSupport.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ExpressionIndexSupport.scala
index 0dab9f6b3f65..f81bda62ac38 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ExpressionIndexSupport.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ExpressionIndexSupport.scala
@@ -360,7 +360,7 @@ class ExpressionIndexSupport(spark: SparkSession,
     val metadataRecords: HoodieData[HoodieRecord[HoodieMetadataPayload]] =
       metadataTable.getRecordsByKeyPrefixes(
         HoodieListData.eager(keyPrefixes.toSeq.asJava), 
HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS, shouldReadInMemory,
-        toJavaOption(Option.empty[SerializableFunctionUnchecked[String, 
String]]))
+        HoodieTableMetadataUtil.IDENTITY_ENCODING)
 
     val columnStatsRecords: HoodieData[HoodieMetadataColumnStats] =
       //TODO: [HUDI-8303] Explicit conversion might not be required for Scala 
2.12+
@@ -564,7 +564,7 @@ class ExpressionIndexSupport(spark: SparkSession,
     val metadataRecords: HoodieData[HoodieRecord[HoodieMetadataPayload]] =
       metadataTable.getRecordsByKeyPrefixes(
         HoodieListData.eager(keyPrefixes.toSeq.asJava), indexPartition, 
shouldReadInMemory,
-        toJavaOption(Option.empty[SerializableFunctionUnchecked[String, 
String]]))
+        HoodieTableMetadataUtil.IDENTITY_ENCODING)
     val columnStatsRecords: HoodieData[HoodieMetadataColumnStats] =
       //TODO: [HUDI-8303] Explicit conversion might not be required for Scala 
2.12+
       metadataRecords.map(JFunction.toJavaSerializableFunction(record => {
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionStatsIndexSupport.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionStatsIndexSupport.scala
index f00cd9c20f48..5b5fcfeb12d4 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionStatsIndexSupport.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionStatsIndexSupport.scala
@@ -73,7 +73,7 @@ class PartitionStatsIndexSupport(spark: SparkSession,
     val metadataRecords: HoodieData[HoodieRecord[HoodieMetadataPayload]] =
       metadataTable.getRecordsByKeyPrefixes(
         HoodieListData.eager(encodedTargetColumnNames.asJava), 
HoodieTableMetadataUtil.PARTITION_NAME_PARTITION_STATS, shouldReadInMemory,
-        toJavaOption(Option.empty[SerializableFunctionUnchecked[String, 
String]]))
+        HoodieTableMetadataUtil.IDENTITY_ENCODING)
     val columnStatsRecords: HoodieData[HoodieMetadataColumnStats] =
       //TODO: [HUDI-8303] Explicit conversion might not be required for Scala 
2.12+
       metadataRecords.map(JFunction.toJavaSerializableFunction(record => {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
index 8f3492e75189..3d0661b5e22a 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
@@ -193,6 +193,7 @@ import static 
org.apache.hudi.config.HoodieCompactionConfig.INLINE_COMPACT_NUM_D
 import static 
org.apache.hudi.io.storage.HoodieSparkIOFactory.getHoodieSparkIOFactory;
 import static 
org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP;
 import static 
org.apache.hudi.metadata.HoodieTableMetadata.getMetadataTableBasePath;
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.IDENTITY_ENCODING;
 import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.deleteMetadataTable;
 import static org.apache.hudi.metadata.MetadataPartitionType.BLOOM_FILTERS;
 import static org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS;
@@ -1999,7 +2000,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
       ColumnIndexID columnIndexID = new 
ColumnIndexID(HoodieRecord.RECORD_KEY_METADATA_FIELD);
       List<HoodieRecord<HoodieMetadataPayload>> result = 
tableMetadata.getRecordsByKeyPrefixes(
           
HoodieListData.lazy(Collections.singletonList(columnIndexID.asBase64EncodedString())),
-          MetadataPartitionType.COLUMN_STATS.getPartitionPath(), true, 
Option.empty()).collectAsList();
+          MetadataPartitionType.COLUMN_STATS.getPartitionPath(), true, 
IDENTITY_ENCODING).collectAsList();
 
       // there are 3 partitions in total and 2 commits. total entries should 
be 6.
       assertEquals(result.size(), 6);
@@ -2008,7 +2009,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
       PartitionIndexID partitionIndexID = new 
PartitionIndexID(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);
       result = tableMetadata.getRecordsByKeyPrefixes(
           
HoodieListData.lazy(Collections.singletonList(columnIndexID.asBase64EncodedString().concat(partitionIndexID.asBase64EncodedString()))),
-          MetadataPartitionType.COLUMN_STATS.getPartitionPath(), true, 
Option.empty()).collectAsList();
+          MetadataPartitionType.COLUMN_STATS.getPartitionPath(), true, 
IDENTITY_ENCODING).collectAsList();
       // 1 partition and 2 commits. total entries should be 2.
       assertEquals(result.size(), 2);
       result.forEach(entry -> {
@@ -2027,7 +2028,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
       columnIndexID = new 
ColumnIndexID(HoodieRecord.COMMIT_TIME_METADATA_FIELD);
       result = tableMetadata.getRecordsByKeyPrefixes(
           
HoodieListData.lazy(Collections.singletonList(columnIndexID.asBase64EncodedString().concat(partitionIndexID.asBase64EncodedString()))),
-          MetadataPartitionType.COLUMN_STATS.getPartitionPath(), true, 
Option.empty()).collectAsList();
+          MetadataPartitionType.COLUMN_STATS.getPartitionPath(), true, 
IDENTITY_ENCODING).collectAsList();
 
       // 1 partition and 2 commits. total entries should be 2.
       assertEquals(result.size(), 2);
@@ -3016,7 +3017,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
       HoodieTableMetadata tableMetadata = metadata(client, storage);
       assertTrue(tableMetadata.getRecordsByKeyPrefixes(
           
HoodieListData.lazy(Collections.singletonList(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)),
-          FILES.getPartitionPath(), false, Option.empty()).isEmpty());
+          FILES.getPartitionPath(), false, IDENTITY_ENCODING).isEmpty());
       
assertTrue(tableMetadata.getAllPartitionPaths().contains(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH));
       
assertFalse(tableMetadata.getAllPartitionPaths().contains(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH));
       // above upsert would have triggered clean

Reply via email to