This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 30fcb2a77e19 feat(metadata): Fix partitioned RLI lookup to use full 
record key (#19026)
30fcb2a77e19 is described below

commit 30fcb2a77e19619071cafb7696b8ed3dde88fd9c
Author: Shuo Cheng <[email protected]>
AuthorDate: Wed Jun 17 15:49:26 2026 +0800

    feat(metadata): Fix partitioned RLI lookup to use full record key (#19026)
---
 .../hudi/metadata/HoodieBackedTableMetadata.java   |  2 +-
 .../hudi/functional/TestRecordLevelIndex.scala     | 35 ++++++++++++++++++++++
 2 files changed, 36 insertions(+), 1 deletion(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index a7e53e226033..21a18f199245 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -279,7 +279,7 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
       // all keys will be from the same shard index so just calculate the 
first key and reduce partitionFileSlices to 1
       TreeSet<String> distinctSortedKeys = 
getDistinctSortedKeysForSingleSlice(keys);
       int fileGroupIndex = 
HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(distinctSortedKeys.stream().findFirst().get(),
 fileSlicesForDataPartition.size());
-      return readSliceAndFilterByKeysIntoList(partitionName, 
distinctSortedKeys, fileSlicesForDataPartition.get(fileGroupIndex), false);
+      return readSliceAndFilterByKeysIntoList(partitionName, 
distinctSortedKeys, fileSlicesForDataPartition.get(fileGroupIndex), true);
     } else if (partitionName.equals(RECORD_INDEX.getPartitionPath()) && 
!fileSlices.isEmpty() && 
HoodieTableMetadataUtil.verifyRLIFile(fileSlices.get(0).getFileId(), true)) {
       if (keys.isEmpty()) {
         return HoodieListData.lazy(Collections.emptyList());
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
index cf63ea915630..e7015b5ab81e 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
@@ -393,6 +393,41 @@ class TestRecordLevelIndex extends 
RecordLevelIndexTestBase with SparkDatasetMix
     assertEquals(0, partition3Locations.size)
   }
 
+  @Test
+  def testPartitionedRecordLevelIndexLookupUsesFullKey(): Unit = {
+    initMetaClient(HoodieTableType.COPY_ON_WRITE)
+    val dataGen = new HoodieTestDataGenerator()
+    val inserts = dataGen.generateInserts("001", 3)
+    val insertDf = toDataset(spark, inserts).withColumn("data_partition_path", 
lit("partition1"))
+    val options = Map(HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
+      DataSourceWriteOptions.TABLE_TYPE.key -> 
HoodieTableType.COPY_ON_WRITE.name(),
+      RECORDKEY_FIELD.key -> "_row_key",
+      PARTITIONPATH_FIELD.key -> "data_partition_path",
+      HoodieTableConfig.ORDERING_FIELDS.key -> "timestamp",
+      HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_ENABLE_PROP.key() -> 
"false",
+      HoodieMetadataConfig.RECORD_LEVEL_INDEX_ENABLE_PROP.key() -> "true",
+      HoodieMetadataConfig.SECONDARY_INDEX_ENABLE_PROP.key() -> "false",
+      HoodieIndexConfig.INDEX_TYPE.key() -> RECORD_LEVEL_INDEX.name())
+    insertDf.write.format("hudi")
+      .options(options)
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+
+    val writeConfig = getWriteConfig(options)
+    val metadata = metadataWriter(writeConfig).getTableMetadata
+    val recordKeys = inserts.asScala.map(i => i.getRecordKey).asJava
+    assertEquals(3, readRecordIndex(metadata, recordKeys, 
HOption.of("partition1")).size)
+
+    // Partitioned RLI lookup is still a full record-key lookup within the 
data partition.
+    // A prefix-only key would incorrectly match real records if the lookup 
used prefix matching.
+    val recordKeySet = recordKeys.asScala.toSet
+    val prefixOnlyKey = recordKeys.asScala
+      .flatMap(key => (1 until key.length).map(key.substring(0, _)))
+      .find(prefix => !recordKeySet.contains(prefix))
+      .get
+    assertTrue(readRecordIndex(metadata, 
util.Collections.singletonList(prefixOnlyKey), 
HOption.of("partition1")).isEmpty)
+  }
+
   @ParameterizedTest
   @MethodSource(Array("testArgsForPartitionedRecordLevelIndex"))
   def testPartitionedRecordLevelIndexInitializationBasic(testCase: 
TestPartitionedRecordLevelIndexTestCase): Unit = {

Reply via email to