This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 30fcb2a77e19 feat(metadata): Fix partitioned RLI lookup to use full
record key (#19026)
30fcb2a77e19 is described below
commit 30fcb2a77e19619071cafb7696b8ed3dde88fd9c
Author: Shuo Cheng <[email protected]>
AuthorDate: Wed Jun 17 15:49:26 2026 +0800
feat(metadata): Fix partitioned RLI lookup to use full record key (#19026)
---
.../hudi/metadata/HoodieBackedTableMetadata.java | 2 +-
.../hudi/functional/TestRecordLevelIndex.scala | 35 ++++++++++++++++++++++
2 files changed, 36 insertions(+), 1 deletion(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index a7e53e226033..21a18f199245 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -279,7 +279,7 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
// all keys will be from the same shard index so just calculate the
first key and reduce partitionFileSlices to 1
TreeSet<String> distinctSortedKeys =
getDistinctSortedKeysForSingleSlice(keys);
int fileGroupIndex =
HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(distinctSortedKeys.stream().findFirst().get(),
fileSlicesForDataPartition.size());
- return readSliceAndFilterByKeysIntoList(partitionName,
distinctSortedKeys, fileSlicesForDataPartition.get(fileGroupIndex), false);
+ return readSliceAndFilterByKeysIntoList(partitionName,
distinctSortedKeys, fileSlicesForDataPartition.get(fileGroupIndex), true);
} else if (partitionName.equals(RECORD_INDEX.getPartitionPath()) &&
!fileSlices.isEmpty() &&
HoodieTableMetadataUtil.verifyRLIFile(fileSlices.get(0).getFileId(), true)) {
if (keys.isEmpty()) {
return HoodieListData.lazy(Collections.emptyList());
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
index cf63ea915630..e7015b5ab81e 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
@@ -393,6 +393,41 @@ class TestRecordLevelIndex extends
RecordLevelIndexTestBase with SparkDatasetMix
assertEquals(0, partition3Locations.size)
}
+ @Test
+ def testPartitionedRecordLevelIndexLookupUsesFullKey(): Unit = {
+ initMetaClient(HoodieTableType.COPY_ON_WRITE)
+ val dataGen = new HoodieTestDataGenerator()
+ val inserts = dataGen.generateInserts("001", 3)
+ val insertDf = toDataset(spark, inserts).withColumn("data_partition_path",
lit("partition1"))
+ val options = Map(HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
+ DataSourceWriteOptions.TABLE_TYPE.key ->
HoodieTableType.COPY_ON_WRITE.name(),
+ RECORDKEY_FIELD.key -> "_row_key",
+ PARTITIONPATH_FIELD.key -> "data_partition_path",
+ HoodieTableConfig.ORDERING_FIELDS.key -> "timestamp",
+ HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_ENABLE_PROP.key() ->
"false",
+ HoodieMetadataConfig.RECORD_LEVEL_INDEX_ENABLE_PROP.key() -> "true",
+ HoodieMetadataConfig.SECONDARY_INDEX_ENABLE_PROP.key() -> "false",
+ HoodieIndexConfig.INDEX_TYPE.key() -> RECORD_LEVEL_INDEX.name())
+ insertDf.write.format("hudi")
+ .options(options)
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+
+ val writeConfig = getWriteConfig(options)
+ val metadata = metadataWriter(writeConfig).getTableMetadata
+ val recordKeys = inserts.asScala.map(i => i.getRecordKey).asJava
+ assertEquals(3, readRecordIndex(metadata, recordKeys,
HOption.of("partition1")).size)
+
+ // Partitioned RLI lookup is still a full record-key lookup within the
data partition.
+ // A prefix-only key would incorrectly match real records if the lookup
used prefix matching.
+ val recordKeySet = recordKeys.asScala.toSet
+ val prefixOnlyKey = recordKeys.asScala
+ .flatMap(key => (1 until key.length).map(key.substring(0, _)))
+ .find(prefix => !recordKeySet.contains(prefix))
+ .get
+ assertTrue(readRecordIndex(metadata,
util.Collections.singletonList(prefixOnlyKey),
HOption.of("partition1")).isEmpty)
+ }
+
@ParameterizedTest
@MethodSource(Array("testArgsForPartitionedRecordLevelIndex"))
def testPartitionedRecordLevelIndexInitializationBasic(testCase:
TestPartitionedRecordLevelIndexTestCase): Unit = {