This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 3847b44b0d81 [HUDI-9656] Rename index lookup components (#13647)
3847b44b0d81 is described below
commit 3847b44b0d81585a862bcd4168370890f9f59186
Author: Davis-Zhang-Onehouse
<[email protected]>
AuthorDate: Tue Jul 29 19:45:11 2025 -0700
[HUDI-9656] Rename index lookup components (#13647)
---
.../hudi/client/TestJavaHoodieBackedMetadata.java | 21 +-
.../client/utils/SparkMetadataWriterUtils.java | 4 +-
.../hudi/index/SparkMetadataTableRecordIndex.java | 2 +-
.../hudi/common/table/view/NoOpTableMetadata.java | 14 +-
.../apache/hudi/metadata/BaseTableMetadata.java | 77 +++---
.../hudi/metadata/BloomFilterIndexRawKey.java | 75 ++++++
.../metadata/ColumnStatsIndexPrefixRawKey.java | 95 ++++++++
.../hudi/metadata/ColumnStatsIndexRawKey.java | 87 +++++++
.../metadata/FileSystemBackedTableMetadata.java | 13 +-
.../org/apache/hudi/metadata/FilesIndexRawKey.java | 66 ++++++
.../hudi/metadata/HoodieBackedTableMetadata.java | 263 +++++++--------------
.../apache/hudi/metadata/HoodieTableMetadata.java | 28 +--
.../hudi/metadata/HoodieTableMetadataUtil.java | 30 ++-
.../main/java/org/apache/hudi/metadata/RawKey.java | 33 +++
.../apache/hudi/metadata/RecordIndexRawKey.java | 65 +++++
.../hudi/metadata/SecondaryIndexKeyUtils.java | 63 ++---
.../hudi/metadata/SecondaryIndexPrefixRawKey.java | 64 +++++
.../TestHoodieBackedTableMetadataDataCleanup.java | 20 +-
.../hudi/metadata/TestHoodieTableMetadataUtil.java | 33 +--
.../hudi/metadata/TestSecondaryIndexKeyUtils.java | 5 +-
.../apache/hudi/source/stats/FileStatsIndex.java | 13 +-
.../hudi/metadata/TestHoodieTableMetadataUtil.java | 12 +-
.../org/apache/hudi/ColumnStatsIndexSupport.scala | 22 +-
.../org/apache/hudi/ExpressionIndexSupport.scala | 54 ++---
.../apache/hudi/PartitionStatsIndexSupport.scala | 10 +-
.../org/apache/hudi/RecordLevelIndexSupport.scala | 2 +-
.../org/apache/hudi/SecondaryIndexSupport.scala | 2 +-
.../hudi/functional/TestHoodieBackedMetadata.java | 48 ++--
.../hudi/functional/RecordLevelIndexTestBase.scala | 4 +-
.../hudi/functional/TestMetadataRecordIndex.scala | 2 +-
.../hudi/feature/index/TestSecondaryIndex.scala | 3 +-
.../testHoodieBackedTableMetadataIndexLookup.scala | 26 +-
.../utilities/HoodieMetadataTableValidator.java | 2 +-
33 files changed, 825 insertions(+), 433 deletions(-)
diff --git
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
index f74b92d36e4b..4f23b94ba6a9 100644
---
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
+++
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
@@ -78,8 +78,8 @@ import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.JsonUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableIterator;
-import org.apache.hudi.common.util.hash.ColumnIndexID;
import org.apache.hudi.common.util.hash.PartitionIndexID;
+import org.apache.hudi.metadata.ColumnStatsIndexPrefixRawKey;
import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieClusteringConfig;
@@ -164,7 +164,6 @@ import static
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_FILE_NAME
import static
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
import static
org.apache.hudi.config.HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS;
import static
org.apache.hudi.metadata.HoodieTableMetadata.getMetadataTableBasePath;
-import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.IDENTITY_ENCODING;
import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.deleteMetadataTable;
import static org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS;
import static org.apache.hudi.metadata.MetadataPartitionType.FILES;
@@ -1472,10 +1471,9 @@ public class TestJavaHoodieBackedMetadata extends
TestHoodieMetadataBase {
HoodieTableMetadata tableMetadata = metadata(client);
// prefix search for column (_hoodie_record_key)
- ColumnIndexID columnIndexID = new
ColumnIndexID(HoodieRecord.RECORD_KEY_METADATA_FIELD);
List<HoodieRecord<HoodieMetadataPayload>> result =
tableMetadata.getRecordsByKeyPrefixes(
-
HoodieListData.lazy(Collections.singletonList(columnIndexID.asBase64EncodedString())),
- MetadataPartitionType.COLUMN_STATS.getPartitionPath(), true,
IDENTITY_ENCODING).collectAsList();
+ HoodieListData.lazy(Collections.singletonList(new
ColumnStatsIndexPrefixRawKey(HoodieRecord.RECORD_KEY_METADATA_FIELD))),
+ MetadataPartitionType.COLUMN_STATS.getPartitionPath(),
true).collectAsList();
// there are 3 partitions in total and 2 commits. total entries should
be 6.
assertEquals(result.size(), 6);
@@ -1486,8 +1484,10 @@ public class TestJavaHoodieBackedMetadata extends
TestHoodieMetadataBase {
// prefix search for col(_hoodie_record_key) and first partition. only 2
files should be matched
PartitionIndexID partitionIndexID = new
PartitionIndexID(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);
result = tableMetadata.getRecordsByKeyPrefixes(
-
HoodieListData.lazy(Collections.singletonList(columnIndexID.asBase64EncodedString().concat(partitionIndexID.asBase64EncodedString()))),
- MetadataPartitionType.COLUMN_STATS.getPartitionPath(), true,
IDENTITY_ENCODING).collectAsList();
+ HoodieListData.lazy(HoodieTableMetadataUtil.generateColumnStatsKeys(
+
Collections.singletonList(HoodieRecord.RECORD_KEY_METADATA_FIELD),
+ HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)),
+ MetadataPartitionType.COLUMN_STATS.getPartitionPath(),
true).collectAsList();
// 1 partition and 2 commits. total entries should be 2.
assertEquals(result.size(), 2);
result.forEach(entry -> {
@@ -1504,10 +1504,11 @@ public class TestJavaHoodieBackedMetadata extends
TestHoodieMetadataBase {
});
// prefix search for column {commit time} and first partition
- columnIndexID = new
ColumnIndexID(HoodieRecord.COMMIT_TIME_METADATA_FIELD);
result = tableMetadata.getRecordsByKeyPrefixes(
-
HoodieListData.lazy(Collections.singletonList(columnIndexID.asBase64EncodedString().concat(partitionIndexID.asBase64EncodedString()))),
- MetadataPartitionType.COLUMN_STATS.getPartitionPath(), true,
IDENTITY_ENCODING).collectAsList();
+ HoodieListData.lazy(HoodieTableMetadataUtil.generateColumnStatsKeys(
+
Collections.singletonList(HoodieRecord.COMMIT_TIME_METADATA_FIELD),
+ HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)),
+ MetadataPartitionType.COLUMN_STATS.getPartitionPath(),
true).collectAsList();
// 1 partition and 2 commits. total entries should be 2.
assertEquals(result.size(), 2);
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
index 2f4d561e96da..90e56adbe16c 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
@@ -108,7 +108,6 @@ import static
org.apache.hudi.metadata.HoodieMetadataPayload.COLUMN_STATS_FIELD_
import static
org.apache.hudi.metadata.HoodieMetadataPayload.COLUMN_STATS_FIELD_VALUE_COUNT;
import static
org.apache.hudi.metadata.HoodieMetadataPayload.createBloomFilterMetadataRecord;
import static
org.apache.hudi.metadata.HoodieMetadataPayload.createColumnStatsRecords;
-import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.IDENTITY_ENCODING;
import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_BLOOM_FILTERS;
import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS;
import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.getFileSystemViewForMetadataTable;
@@ -403,7 +402,8 @@ public class SparkMetadataWriterUtils {
// Fetch EI column stat records for above files
List<HoodieColumnRangeMetadata<Comparable>> partitionColumnMetadata =
tableMetadata.getRecordsByKeyPrefixes(
-
HoodieListData.lazy(HoodieTableMetadataUtil.generateKeyPrefixes(validColumnsToIndex,
partitionName)), indexPartition, false, IDENTITY_ENCODING)
+
HoodieListData.lazy(HoodieTableMetadataUtil.generateColumnStatsKeys(validColumnsToIndex,
partitionName)),
+ indexPartition, false)
// schema and properties are ignored in getInsertValue, so
simply pass as null
.map(record -> record.getData().getInsertValue(null, null))
.filter(Option::isPresent)
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkMetadataTableRecordIndex.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkMetadataTableRecordIndex.java
index b37a0506f766..cf9d5c7690e7 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkMetadataTableRecordIndex.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkMetadataTableRecordIndex.java
@@ -171,7 +171,7 @@ public class SparkMetadataTableRecordIndex extends
HoodieIndex<Object, Object> {
// recordIndexInfo object only contains records that are present in
record_index.
HoodiePairData<String, HoodieRecordGlobalLocation> recordIndexData =
-
hoodieTable.getMetadataTable().readRecordIndex(HoodieListData.eager(keysToLookup));
+
hoodieTable.getMetadataTable().readRecordIndexLocationsWithKeys(HoodieListData.eager(keysToLookup));
try {
Map<String, HoodieRecordGlobalLocation> recordIndexInfo =
HoodieDataUtils.dedupeAndCollectAsMap(recordIndexData);
return recordIndexInfo.entrySet().stream()
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/NoOpTableMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/NoOpTableMetadata.java
index 6c7cc33f934f..5908d805d773 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/NoOpTableMetadata.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/NoOpTableMetadata.java
@@ -23,7 +23,6 @@ import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodieListPairData;
import org.apache.hudi.common.data.HoodiePairData;
-import org.apache.hudi.common.function.SerializableFunctionUnchecked;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
import org.apache.hudi.common.util.Option;
@@ -34,6 +33,7 @@ import org.apache.hudi.internal.schema.Types;
import org.apache.hudi.metadata.HoodieMetadataPayload;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.metadata.RawKey;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathInfo;
@@ -105,20 +105,20 @@ class NoOpTableMetadata implements HoodieTableMetadata {
}
@Override
- public HoodiePairData<String, HoodieRecordGlobalLocation>
readRecordIndex(HoodieData<String> recordKeys) {
+ public HoodiePairData<String, HoodieRecordGlobalLocation>
readRecordIndexLocationsWithKeys(HoodieData<String> recordKeys) {
throw new HoodieMetadataException("Unsupported operation:
readRecordIndex!");
}
@Override
- public HoodiePairData<String, HoodieRecordGlobalLocation>
readSecondaryIndex(HoodieData<String> secondaryKeys, String partitionName) {
+ public HoodiePairData<String, HoodieRecordGlobalLocation>
readSecondaryIndexLocationsWithKeys(HoodieData<String> secondaryKeys, String
partitionName) {
return HoodieListPairData.eager(Collections.emptyMap());
}
@Override
- public HoodieData<HoodieRecord<HoodieMetadataPayload>>
getRecordsByKeyPrefixes(HoodieData<String> keyPrefixes,
-
String partitionName,
-
boolean shouldLoadInMemory,
-
SerializableFunctionUnchecked<String, String> keyEncoder) {
+ public HoodieData<HoodieRecord<HoodieMetadataPayload>>
getRecordsByKeyPrefixes(
+ HoodieData<? extends RawKey> rawKeys,
+ String partitionName,
+ boolean shouldLoadInMemory) {
throw new HoodieMetadataException("Unsupported operation:
getRecordsByKeyPrefixes!");
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
index d98230b503c8..49178158034f 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
@@ -29,7 +29,6 @@ import org.apache.hudi.common.data.HoodiePairData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.function.SerializableFunctionUnchecked;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -38,9 +37,6 @@ import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.common.util.hash.ColumnIndexID;
-import org.apache.hudi.common.util.hash.FileIndexID;
-import org.apache.hudi.common.util.hash.PartitionIndexID;
import org.apache.hudi.config.metrics.HoodieMetricsConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieMetadataException;
@@ -59,15 +55,11 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
-import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.IDENTITY_ENCODING;
-
/**
* Abstract class for implementing common table metadata operations.
*/
@@ -200,23 +192,21 @@ public abstract class BaseTableMetadata extends
AbstractHoodieTableMetadata {
}
HoodieTimer timer = HoodieTimer.start();
- Set<String> partitionIDFileIDStrings = new HashSet<>();
Map<String, Pair<String, String>> fileToKeyMap = new HashMap<>();
+ List<BloomFilterIndexRawKey> bloomFilterKeys = new ArrayList<>();
partitionNameFileNameList.forEach(partitionNameFileNamePair -> {
- final String bloomFilterIndexKey =
HoodieMetadataPayload.getBloomFilterIndexKey(
- new
PartitionIndexID(HoodieTableMetadataUtil.getBloomFilterIndexPartitionIdentifier(partitionNameFileNamePair.getLeft())),
new FileIndexID(partitionNameFileNamePair.getRight()));
- partitionIDFileIDStrings.add(bloomFilterIndexKey);
- fileToKeyMap.put(bloomFilterIndexKey, partitionNameFileNamePair);
+ BloomFilterIndexRawKey rawKey = new
BloomFilterIndexRawKey(partitionNameFileNamePair.getLeft(),
partitionNameFileNamePair.getRight());
+ bloomFilterKeys.add(rawKey);
+ fileToKeyMap.put(rawKey.encode(), partitionNameFileNamePair);
});
- List<String> partitionIDFileIDStringsList = new
ArrayList<>(partitionIDFileIDStrings);
HoodiePairData<String, HoodieRecord<HoodieMetadataPayload>> recordsData =
- getRecordsByKeys(HoodieListData.eager(partitionIDFileIDStringsList),
metadataPartitionName, IDENTITY_ENCODING);
+ readIndexRecordsWithKeys(HoodieListData.eager(bloomFilterKeys),
metadataPartitionName);
Map<String, HoodieRecord<HoodieMetadataPayload>> hoodieRecords;
try {
hoodieRecords = HoodieDataUtils.dedupeAndCollectAsMap(recordsData);
metrics.ifPresent(m ->
m.updateMetrics(HoodieMetadataMetrics.LOOKUP_BLOOM_FILTERS_METADATA_STR,
timer.endTimer()));
- metrics.ifPresent(m ->
m.setMetric(HoodieMetadataMetrics.LOOKUP_BLOOM_FILTERS_FILE_COUNT_STR,
partitionIDFileIDStringsList.size()));
+ metrics.ifPresent(m ->
m.setMetric(HoodieMetadataMetrics.LOOKUP_BLOOM_FILTERS_FILE_COUNT_STR,
bloomFilterKeys.size()));
} finally {
recordsData.unpersistWithDependencies();
}
@@ -261,8 +251,8 @@ public abstract class BaseTableMetadata extends
AbstractHoodieTableMetadata {
return Collections.emptyMap();
}
- Map<String, Pair<String, String>> columnStatKeyToFileNameMap =
computeColStatKeyToFileName(partitionNameFileNameList, columnNames);
- return computeFileToColumnStatsMap(columnStatKeyToFileNameMap);
+ Pair<List<ColumnStatsIndexRawKey>, Map<String, Pair<String, String>>>
rawKeysAndMap = computeColStatRawKeys(partitionNameFileNameList, columnNames);
+ return computeFileToColumnStatsMap(rawKeysAndMap.getLeft(),
rawKeysAndMap.getRight());
}
/**
@@ -270,7 +260,7 @@ public abstract class BaseTableMetadata extends
AbstractHoodieTableMetadata {
*/
protected List<String> fetchAllPartitionPaths() {
HoodieTimer timer = HoodieTimer.start();
- Option<HoodieRecord<HoodieMetadataPayload>> recordOpt =
getRecordByKey(RECORDKEY_PARTITION_LIST,
+ Option<HoodieRecord<HoodieMetadataPayload>> recordOpt =
readFilesIndexRecords(RECORDKEY_PARTITION_LIST,
MetadataPartitionType.FILES.getPartitionPath());
metrics.ifPresent(m ->
m.updateMetrics(HoodieMetadataMetrics.LOOKUP_PARTITIONS_STR, timer.endTimer()));
@@ -302,7 +292,7 @@ public abstract class BaseTableMetadata extends
AbstractHoodieTableMetadata {
String recordKey = relativePartitionPath.isEmpty() ? NON_PARTITIONED_NAME
: relativePartitionPath;
HoodieTimer timer = HoodieTimer.start();
- Option<HoodieRecord<HoodieMetadataPayload>> recordOpt =
getRecordByKey(recordKey,
+ Option<HoodieRecord<HoodieMetadataPayload>> recordOpt =
readFilesIndexRecords(recordKey,
MetadataPartitionType.FILES.getPartitionPath());
metrics.ifPresent(m ->
m.updateMetrics(HoodieMetadataMetrics.LOOKUP_FILES_STR, timer.endTimer()));
@@ -334,9 +324,12 @@ public abstract class BaseTableMetadata extends
AbstractHoodieTableMetadata {
);
HoodieTimer timer = HoodieTimer.start();
+ List<FilesIndexRawKey> filesKeys = partitionIdToPathMap.keySet().stream()
+ .map(FilesIndexRawKey::new)
+ .collect(Collectors.toList());
HoodiePairData<String, HoodieRecord<HoodieMetadataPayload>> recordsData =
- getRecordsByKeys(HoodieListData.eager(new
ArrayList<>(partitionIdToPathMap.keySet())),
- MetadataPartitionType.FILES.getPartitionPath(), IDENTITY_ENCODING);
+ readIndexRecordsWithKeys(HoodieListData.eager(filesKeys),
+ MetadataPartitionType.FILES.getPartitionPath());
Map<String, HoodieRecord<HoodieMetadataPayload>> partitionIdRecordPairs;
try {
partitionIdRecordPairs =
HoodieDataUtils.dedupeAndCollectAsMap(recordsData);
@@ -366,39 +359,43 @@ public abstract class BaseTableMetadata extends
AbstractHoodieTableMetadata {
}
/**
- * Computes a map from col-stats key to partition and file name pair.
+ * Computes raw keys and metadata for column stats lookup.
*
* @param partitionNameFileNameList - List of partition and file name pair
for which bloom filters need to be retrieved.
* @param columnNames - List of column name for which stats are needed.
+ * @return Pair of raw keys list and a map from encoded key to
partition/file pair
*/
- private Map<String, Pair<String, String>> computeColStatKeyToFileName(
+ private Pair<List<ColumnStatsIndexRawKey>, Map<String, Pair<String,
String>>> computeColStatRawKeys(
final List<Pair<String, String>> partitionNameFileNameList,
final List<String> columnNames) {
+ List<ColumnStatsIndexRawKey> rawKeys = new ArrayList<>();
Map<String, Pair<String, String>> columnStatKeyToFileNameMap = new
HashMap<>();
+
for (String columnName : columnNames) {
- final ColumnIndexID columnIndexID = new ColumnIndexID(columnName);
for (Pair<String, String> partitionNameFileNamePair :
partitionNameFileNameList) {
- final String columnStatsIndexKey =
HoodieMetadataPayload.getColumnStatsIndexKey(
- new
PartitionIndexID(HoodieTableMetadataUtil.getColumnStatsIndexPartitionIdentifier(partitionNameFileNamePair.getLeft())),
- new FileIndexID(partitionNameFileNamePair.getRight()),
- columnIndexID);
- columnStatKeyToFileNameMap.put(columnStatsIndexKey,
partitionNameFileNamePair);
+ ColumnStatsIndexRawKey rawKey = new ColumnStatsIndexRawKey(
+ partitionNameFileNamePair.getLeft(),
+ partitionNameFileNamePair.getRight(),
+ columnName);
+ rawKeys.add(rawKey);
+ columnStatKeyToFileNameMap.put(rawKey.encode(),
partitionNameFileNamePair);
}
}
- return columnStatKeyToFileNameMap;
+ return Pair.of(rawKeys, columnStatKeyToFileNameMap);
}
/**
* Computes the map from partition and file name pair to
HoodieMetadataColumnStats record.
*
+ * @param rawKeys - List of raw keys for column stats
* @param columnStatKeyToFileNameMap - A map from col-stats key to partition
and file name pair.
*/
- private Map<Pair<String, String>, List<HoodieMetadataColumnStats>>
computeFileToColumnStatsMap(Map<String, Pair<String, String>>
columnStatKeyToFileNameMap) {
- List<String> columnStatKeylist = new
ArrayList<>(columnStatKeyToFileNameMap.keySet());
+ private Map<Pair<String, String>, List<HoodieMetadataColumnStats>>
computeFileToColumnStatsMap(
+ List<ColumnStatsIndexRawKey> rawKeys, Map<String, Pair<String, String>>
columnStatKeyToFileNameMap) {
HoodieTimer timer = HoodieTimer.start();
HoodiePairData<String, HoodieRecord<HoodieMetadataPayload>> recordsData =
- getRecordsByKeys(
- HoodieListData.eager(columnStatKeylist),
MetadataPartitionType.COLUMN_STATS.getPartitionPath(), IDENTITY_ENCODING);
+ readIndexRecordsWithKeys(
+ HoodieListData.eager(rawKeys),
MetadataPartitionType.COLUMN_STATS.getPartitionPath());
Map<String, HoodieRecord<HoodieMetadataPayload>> hoodieRecords;
try {
hoodieRecords = HoodieDataUtils.dedupeAndCollectAsMap(recordsData);
@@ -443,17 +440,17 @@ public abstract class BaseTableMetadata extends
AbstractHoodieTableMetadata {
* @param partitionName The partition name where the record is stored
* @return Option containing the record if found, empty Option if not found
*/
- protected abstract Option<HoodieRecord<HoodieMetadataPayload>>
getRecordByKey(String key, String partitionName);
+ protected abstract Option<HoodieRecord<HoodieMetadataPayload>>
readFilesIndexRecords(String key, String partitionName);
/**
* Retrieves a collection of pairs (key -> record) from the metadata table
by its keys.
*
- * @param keys The to look up in the metadata table
+ * @param rawKeys The raw keys to look up in the metadata table
* @param partitionName The partition name where the records are stored
* @return A collection of pairs (key -> record)
*/
- public abstract HoodiePairData<String, HoodieRecord<HoodieMetadataPayload>>
getRecordsByKeys(
- HoodieData<String> keys, String partitionName,
SerializableFunctionUnchecked<String, String> keyEncodingFn);
+ public abstract HoodiePairData<String, HoodieRecord<HoodieMetadataPayload>>
readIndexRecordsWithKeys(
+ HoodieData<? extends RawKey> rawKeys, String partitionName);
/**
* Returns a collection of pairs (secondary-key -> record-keys) for the
provided secondary keys.
@@ -463,7 +460,7 @@ public abstract class BaseTableMetadata extends
AbstractHoodieTableMetadata {
* @return A collection of pairs where each key is a secondary key and the
value is record key that are indexed by that secondary key.
* If a secondary key value is mapped to different record keys, they are
tracked as multiple pairs for each of them.
*/
- public abstract HoodiePairData<String, String>
getSecondaryIndexRecords(HoodieData<String> keys, String partitionName);
+ public abstract HoodiePairData<String, String>
readSecondaryIndexDataTableRecordKeysWithKeys(HoodieData<String> keys, String
partitionName);
public HoodieMetadataConfig getMetadataConfig() {
return metadataConfig;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/BloomFilterIndexRawKey.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/BloomFilterIndexRawKey.java
new file mode 100644
index 000000000000..911a3d78380d
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/BloomFilterIndexRawKey.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.common.util.hash.FileIndexID;
+import org.apache.hudi.common.util.hash.PartitionIndexID;
+
+import java.util.Objects;
+
+/**
+ * Represents a raw key for bloom filter metadata.
+ */
+public class BloomFilterIndexRawKey implements RawKey {
+ private final String partitionName;
+ private final String fileName;
+
+ public BloomFilterIndexRawKey(String partitionName, String fileName) {
+ this.partitionName = Objects.requireNonNull(partitionName);
+ this.fileName = Objects.requireNonNull(fileName);
+ }
+
+ @Override
+ public String encode() {
+ return HoodieMetadataPayload.getBloomFilterIndexKey(
+ new
PartitionIndexID(HoodieTableMetadataUtil.getBloomFilterIndexPartitionIdentifier(partitionName)),
+ new FileIndexID(fileName));
+ }
+
+ public String getPartitionName() {
+ return partitionName;
+ }
+
+ public String getFileName() {
+ return fileName;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ BloomFilterIndexRawKey that = (BloomFilterIndexRawKey) o;
+ return Objects.equals(partitionName, that.partitionName) &&
Objects.equals(fileName, that.fileName);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(partitionName, fileName);
+ }
+
+ @Override
+ public String toString() {
+ return "BloomFilterRawKey{" + "partitionName='" + partitionName + '\'' +
", fileName='" + fileName + '\'' + '}';
+ }
+}
\ No newline at end of file
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/ColumnStatsIndexPrefixRawKey.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/ColumnStatsIndexPrefixRawKey.java
new file mode 100644
index 000000000000..cb20d6201953
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/ColumnStatsIndexPrefixRawKey.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.hash.ColumnIndexID;
+import org.apache.hudi.common.util.hash.PartitionIndexID;
+
+import java.util.Objects;
+
+/**
+ * Represents a raw key for column stats index consisting of column name and
optional partition name.
+ */
+public class ColumnStatsIndexPrefixRawKey implements RawKey {
+ private static final long serialVersionUID = 1L;
+
+ private final String columnName;
+ private final Option<String> partitionName;
+
+ public ColumnStatsIndexPrefixRawKey(String columnName) {
+ this(columnName, Option.empty());
+ }
+
+ public ColumnStatsIndexPrefixRawKey(String columnName, String partitionName)
{
+ this(columnName, Option.of(partitionName));
+ }
+
+ public ColumnStatsIndexPrefixRawKey(String columnName, Option<String>
partitionName) {
+ this.columnName = Objects.requireNonNull(columnName, "Column name cannot
be null");
+ this.partitionName = Objects.requireNonNull(partitionName, "Partition name
option cannot be null");
+ }
+
+ public String getColumnName() {
+ return columnName;
+ }
+
+ public Option<String> getPartitionName() {
+ return partitionName;
+ }
+
+ @Override
+ public String encode() {
+ ColumnIndexID columnIndexID = new ColumnIndexID(columnName);
+ String encodedValue;
+
+ if (partitionName.isPresent()) {
+ PartitionIndexID partitionIndexId = new PartitionIndexID(
+
HoodieTableMetadataUtil.getColumnStatsIndexPartitionIdentifier(partitionName.get()));
+ encodedValue = columnIndexID.asBase64EncodedString()
+ .concat(partitionIndexId.asBase64EncodedString());
+ } else {
+ encodedValue = columnIndexID.asBase64EncodedString();
+ }
+
+ return encodedValue;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ColumnStatsIndexPrefixRawKey that = (ColumnStatsIndexPrefixRawKey) o;
+ return Objects.equals(columnName, that.columnName) &&
Objects.equals(partitionName, that.partitionName);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(columnName, partitionName);
+ }
+
+ @Override
+ public String toString() {
+ return "ColumnStatsIndexKey{columnName='" + columnName + "',
partitionName=" + partitionName + "}";
+ }
+}
\ No newline at end of file
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/ColumnStatsIndexRawKey.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/ColumnStatsIndexRawKey.java
new file mode 100644
index 000000000000..3cab656a8275
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/ColumnStatsIndexRawKey.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.common.util.hash.ColumnIndexID;
+import org.apache.hudi.common.util.hash.FileIndexID;
+import org.apache.hudi.common.util.hash.PartitionIndexID;
+
+import java.util.Objects;
+
+/**
+ * Represents a raw key for column stats indexed by partition, file and column.
+ * This is different from ColumnStatsIndexPrefixRawKey which is used for
prefix lookups.
+ */
+public class ColumnStatsIndexRawKey implements RawKey {
+ private final String partitionName;
+ private final String fileName;
+ private final String columnName;
+
+ public ColumnStatsIndexRawKey(String partitionName, String fileName, String
columnName) {
+ this.partitionName = Objects.requireNonNull(partitionName);
+ this.fileName = Objects.requireNonNull(fileName);
+ this.columnName = Objects.requireNonNull(columnName);
+ }
+
+ @Override
+ public String encode() {
+ return HoodieMetadataPayload.getColumnStatsIndexKey(
+ new
PartitionIndexID(HoodieTableMetadataUtil.getColumnStatsIndexPartitionIdentifier(partitionName)),
+ new FileIndexID(fileName),
+ new ColumnIndexID(columnName));
+ }
+
+ public String getPartitionName() {
+ return partitionName;
+ }
+
+ public String getFileName() {
+ return fileName;
+ }
+
+ public String getColumnName() {
+ return columnName;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ColumnStatsIndexRawKey that = (ColumnStatsIndexRawKey) o;
+ return Objects.equals(partitionName, that.partitionName)
+ && Objects.equals(fileName, that.fileName)
+ && Objects.equals(columnName, that.columnName);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(partitionName, fileName, columnName);
+ }
+
+ @Override
+ public String toString() {
+ return "ColumnStatsFileRawKey{" + "partitionName='" + partitionName + '\''
+ + ", fileName='" + fileName + '\'' + ", columnName='" + columnName +
'\'' + '}';
+ }
+}
\ No newline at end of file
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
index 6b01180696af..2645ef397b93 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
@@ -24,7 +24,6 @@ import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodiePairData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.function.SerializableFunctionUnchecked;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
@@ -311,20 +310,20 @@ public class FileSystemBackedTableMetadata extends
AbstractHoodieTableMetadata {
}
@Override
- public HoodieData<HoodieRecord<HoodieMetadataPayload>>
getRecordsByKeyPrefixes(HoodieData<String> keyPrefixes,
-
String partitionName,
-
boolean shouldLoadInMemory,
-
SerializableFunctionUnchecked<String, String> keyEncoder) {
+ public HoodieData<HoodieRecord<HoodieMetadataPayload>>
getRecordsByKeyPrefixes(
+ HoodieData<? extends RawKey> rawKeys,
+ String partitionName,
+ boolean shouldLoadInMemory) {
throw new HoodieMetadataException("Unsupported operation:
getRecordsByKeyPrefixes!");
}
@Override
- public HoodiePairData<String, HoodieRecordGlobalLocation>
readRecordIndex(HoodieData<String> recordKeys) {
+ public HoodiePairData<String, HoodieRecordGlobalLocation>
readRecordIndexLocationsWithKeys(HoodieData<String> recordKeys) {
throw new HoodieMetadataException("Unsupported operation:
readRecordIndex!");
}
@Override
- public HoodiePairData<String, HoodieRecordGlobalLocation>
readSecondaryIndex(HoodieData<String> secondaryKeys, String partitionName) {
+ public HoodiePairData<String, HoodieRecordGlobalLocation>
readSecondaryIndexLocationsWithKeys(HoodieData<String> secondaryKeys, String
partitionName) {
throw new HoodieMetadataException("Unsupported operation:
readSecondaryIndex!");
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/FilesIndexRawKey.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/FilesIndexRawKey.java
new file mode 100644
index 000000000000..78fb9a70536a
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/FilesIndexRawKey.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import java.util.Objects;
+
+/**
+ * Represents a raw key for the FILES partition in the metadata table.
+ * This uses identity encoding - the key is used as-is without transformation.
+ */
+public class FilesIndexRawKey implements RawKey {
+ private final String key;
+
+ public FilesIndexRawKey(String key) {
+ this.key = Objects.requireNonNull(key);
+ }
+
+ @Override
+ public String encode() {
+ // Identity encoding - return the key as-is
+ return key;
+ }
+
+ public String getKey() {
+ return key;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ FilesIndexRawKey that = (FilesIndexRawKey) o;
+ return Objects.equals(key, that.key);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(key);
+ }
+
+ @Override
+ public String toString() {
+ return "FilesPartitionRawKey{" + "key='" + key + '\'' + '}';
+ }
+}
\ No newline at end of file
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index 77af1e409d94..c196a6912096 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -53,6 +53,7 @@ import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.ClosableSortedDedupingIterator;
+import org.apache.hudi.common.util.collection.CloseableFilterIterator;
import org.apache.hudi.common.util.collection.CloseableMappingIterator;
import org.apache.hudi.common.util.collection.EmptyIterator;
import org.apache.hudi.common.util.collection.ImmutablePair;
@@ -99,8 +100,6 @@ import static
org.apache.hudi.common.config.HoodieMemoryConfig.SPILLABLE_MAP_BAS
import static
org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FULL_SCAN_LOG_FILES;
import static org.apache.hudi.common.util.ValidationUtils.checkState;
import static org.apache.hudi.metadata.HoodieMetadataPayload.KEY_FIELD_NAME;
-import static
org.apache.hudi.metadata.HoodieMetadataPayload.SECONDARY_INDEX_RECORD_KEY_SEPARATOR;
-import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.IDENTITY_ENCODING;
import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_BLOOM_FILTERS;
import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS;
import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_FILES;
@@ -174,9 +173,9 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
}
@Override
- protected Option<HoodieRecord<HoodieMetadataPayload>> getRecordByKey(String
key, String partitionName) {
- HoodiePairData<String, HoodieRecord<HoodieMetadataPayload>> recordsData =
getRecordsByKeys(
- HoodieListData.eager(Collections.singletonList(key)), partitionName,
IDENTITY_ENCODING);
+ protected Option<HoodieRecord<HoodieMetadataPayload>>
readFilesIndexRecords(String key, String partitionName) {
+ HoodiePairData<String, HoodieRecord<HoodieMetadataPayload>> recordsData =
readIndexRecordsWithKeys(
+ HoodieListData.eager(Collections.singletonList(new
FilesIndexRawKey(key))), partitionName);
try {
List<HoodieRecord<HoodieMetadataPayload>> records =
recordsData.values().collectAsList();
ValidationUtils.checkArgument(records.size() <= 1, () -> "Found more
than 1 record for record key " + key);
@@ -222,12 +221,12 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
}
@Override
- public HoodieData<HoodieRecord<HoodieMetadataPayload>>
getRecordsByKeyPrefixes(HoodieData<String> keyPrefixes,
-
String partitionName,
-
boolean shouldLoadInMemory,
-
SerializableFunctionUnchecked<String, String> keyEncodingFn) {
+ public HoodieData<HoodieRecord<HoodieMetadataPayload>>
getRecordsByKeyPrefixes(
+ HoodieData<? extends RawKey> rawKeys,
+ String partitionName,
+ boolean shouldLoadInMemory) {
// Apply key encoding
- List<String> sortedKeyPrefixes = new
ArrayList<>(keyPrefixes.map(keyEncodingFn::apply).collectAsList());
+ List<String> sortedKeyPrefixes = new ArrayList<>(rawKeys.map(key ->
key.encode()).collectAsList());
// Sort the prefixes so that keys are looked up in order
// Sort must come after encoding.
Collections.sort(sortedKeyPrefixes);
@@ -243,13 +242,14 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
getEngineContext().parallelize(partitionFileSlices))
.flatMap(
(SerializableFunction<FileSlice,
Iterator<HoodieRecord<HoodieMetadataPayload>>>) fileSlice ->
- lookupRecords(partitionName, sortedKeyPrefixes, fileSlice,
+ readSliceAndFilterByKeysIntoList(partitionName,
sortedKeyPrefixes, fileSlice,
metadataRecord -> {
HoodieMetadataPayload payload = new
HoodieMetadataPayload(Option.of(metadataRecord));
String rowKey = payload.key != null ? payload.key :
metadataRecord.get(KEY_FIELD_NAME).toString();
HoodieKey key = new HoodieKey(rowKey, partitionName);
return new HoodieAvroRecord<>(key, payload);
- }, false));
+ }, false))
+ .filter(r -> !r.getData().isDeleted());
}
/**
@@ -259,88 +259,24 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
* 3. [lookup within file groups] lookup the key in the file group
* 4. the record is returned
*/
- /**
- * Performs lookup of records in the metadata table.
- *
- * @param keys The keys to look up in the metadata table
- * @param partitionName The name of the metadata partition to search in
- * @param fileSlices The list of file slices to search through
- * @param keyEncodingFn Optional function to encode keys before lookup
- * @return Pair data containing the looked up records keyed by their
original keys
- */
- private HoodiePairData<String, HoodieRecord<HoodieMetadataPayload>>
doLookup(HoodieData<String> keys, String partitionName, List<FileSlice>
fileSlices,
-
SerializableFunctionUnchecked<String, String> keyEncodingFn) {
- final int numFileSlices = fileSlices.size();
- if (numFileSlices == 1) {
- List<String> keysList = keys.map(keyEncodingFn::apply).collectAsList();
- TreeSet<String> distinctSortedKeys = new TreeSet<>(keysList);
- return lookupKeyRecordPairs(partitionName, new
ArrayList<>(distinctSortedKeys), fileSlices.get(0));
- }
-
- // For SI v2, there are 2 cases require different implementation:
- // SI write path concatenates secKey$recordKey, the secKey needs extracted
for hashing;
- // SI read path gives secKey only, no need for secKey extraction.
- SerializableBiFunction<String, Integer, Integer> mappingFunction =
HoodieTableMetadataUtil::mapRecordKeyToFileGroupIndex;
- keys = repartitioningIfNeeded(keys, partitionName, numFileSlices,
mappingFunction, keyEncodingFn);
- HoodiePairData<Integer, String> persistedInitialPairData = keys
- // Tag key with file group index
- .mapToPair(recordKey -> {
- String encodedKey = keyEncodingFn.apply(recordKey);
- // Always encode the key before apply mapping.
- return new ImmutablePair<>(mappingFunction.apply(encodedKey,
numFileSlices), encodedKey);
- });
- persistedInitialPairData.persist("MEMORY_AND_DISK_SER");
- dataCleanupManager.trackPersistedData(persistedInitialPairData);
- // Use the new processValuesOfTheSameShards API instead of explicit
rangeBasedRepartitionForEachKey
- SerializableFunction<Iterator<String>, Iterator<Pair<String,
HoodieRecord<HoodieMetadataPayload>>>> processFunction =
- sortedKeys -> {
- List<String> keysList = new ArrayList<>();
- // Decorate with sorted stream deduplication.
- try (ClosableSortedDedupingIterator<String> distinctSortedKeyIter =
new ClosableSortedDedupingIterator<>(sortedKeys)) {
- if (!distinctSortedKeyIter.hasNext()) {
- return Collections.emptyIterator();
- }
- distinctSortedKeyIter.forEachRemaining(keysList::add);
- }
- FileSlice fileSlice =
fileSlices.get(mappingFunction.apply(keysList.get(0), numFileSlices));
- return lookupKeyRecordPairsItr(partitionName, keysList, fileSlice);
- };
-
- List<Integer> keySpace = IntStream.range(0,
numFileSlices).boxed().collect(Collectors.toList());
- HoodiePairData<String, HoodieRecord<HoodieMetadataPayload>> result =
- getEngineContext().mapGroupsByKey(persistedInitialPairData,
processFunction, keySpace, true)
- .mapToPair(p -> Pair.of(p.getLeft(), p.getRight()));
- return result.filter((String k, HoodieRecord<HoodieMetadataPayload> v) ->
!v.getData().isDeleted());
- }
-
- /**
- * All keys to be looked up go through the following steps:
- * 1. [encode] escape/encode the key if needed
- * 2. [hash to file group] compute the hash of the key to
- * 3. [lookup within file groups] lookup the key in the file group
- * 4. the record is returned
- */
- private HoodieData<HoodieRecord<HoodieMetadataPayload>>
doLookupIndexRecords(HoodieData<String> keys, String partitionName,
List<FileSlice> fileSlices,
-
SerializableFunctionUnchecked<String, String> keyEncodingFn) {
+ private HoodieData<HoodieRecord<HoodieMetadataPayload>>
lookupIndexRecords(HoodieData<String> keys, String partitionName,
List<FileSlice> fileSlices) {
boolean isSecondaryIndex =
MetadataPartitionType.fromPartitionPath(partitionName).equals(MetadataPartitionType.SECONDARY_INDEX);
final int numFileSlices = fileSlices.size();
if (numFileSlices == 1) {
- List<String> keysList = keys.map(keyEncodingFn::apply).collectAsList();
+ List<String> keysList = keys.collectAsList();
TreeSet<String> distinctSortedKeys = new TreeSet<>(keysList);
- return lookupRecords(partitionName, new ArrayList<>(distinctSortedKeys),
fileSlices.get(0), !isSecondaryIndex);
+ return readSliceAndFilterByKeysIntoList(partitionName, new
ArrayList<>(distinctSortedKeys), fileSlices.get(0), !isSecondaryIndex);
}
// For SI v2, there are 2 cases require different implementation:
// SI write path concatenates secKey$recordKey, the secKey needs extracted
for hashing;
// SI read path gives secKey only, no need for secKey extraction.
SerializableBiFunction<String, Integer, Integer> mappingFunction =
HoodieTableMetadataUtil::mapRecordKeyToFileGroupIndex;
- keys = repartitioningIfNeeded(keys, partitionName, numFileSlices,
mappingFunction, keyEncodingFn);
+ keys = repartitioningIfNeeded(keys, partitionName, numFileSlices,
mappingFunction);
HoodiePairData<Integer, String> persistedInitialPairData = keys
// Tag key with file group index
- .mapToPair(recordKey -> {
- String encodedKey = keyEncodingFn.apply(recordKey);
- // Always encode the key before apply mapping.
+ .mapToPair(encodedKey -> {
return new ImmutablePair<>(mappingFunction.apply(encodedKey,
numFileSlices), encodedKey);
});
persistedInitialPairData.persist("MEMORY_AND_DISK_SER");
@@ -361,10 +297,7 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
return lookupRecordsItr(partitionName, keysList, fileSlice,
!isSecondaryIndex);
};
List<Integer> keySpace = IntStream.range(0,
numFileSlices).boxed().collect(Collectors.toList());
- HoodieData<HoodieRecord<HoodieMetadataPayload>> result =
- getEngineContext().mapGroupsByKey(persistedInitialPairData,
processFunction, keySpace, true);
-
- return result.filter((HoodieRecord<HoodieMetadataPayload> v) ->
!v.getData().isDeleted());
+ return getEngineContext().mapGroupsByKey(persistedInitialPairData,
processFunction, keySpace, true);
}
/**
@@ -375,7 +308,7 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
* @param recordKeys List of mapping from keys to the record location.
*/
@Override
- public HoodiePairData<String, HoodieRecordGlobalLocation>
readRecordIndex(HoodieData<String> recordKeys) {
+ public HoodiePairData<String, HoodieRecordGlobalLocation>
readRecordIndexLocationsWithKeys(HoodieData<String> recordKeys) {
// If record index is not initialized yet, we cannot return an empty
result here unlike the code for reading from other
// indexes. This is because results from this function are used for
upserts and returning an empty result here would lead
// to existing records being inserted again causing duplicates.
@@ -385,7 +318,8 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
return dataCleanupManager.ensureDataCleanupOnException(v -> {
// TODO [HUDI-9544]: Metric does not work for rdd based API due to lazy
evaluation.
- return getRecordsByKeys(recordKeys,
MetadataPartitionType.RECORD_INDEX.getPartitionPath(), IDENTITY_ENCODING)
+ HoodieData<RecordIndexRawKey> rawKeys =
recordKeys.map(RecordIndexRawKey::new);
+ return readIndexRecordsWithKeys(rawKeys,
MetadataPartitionType.RECORD_INDEX.getPartitionPath())
.mapToPair((Pair<String, HoodieRecord<HoodieMetadataPayload>> p) ->
Pair.of(p.getLeft(), p.getRight().getData().getRecordGlobalLocation()));
});
}
@@ -406,10 +340,11 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
ValidationUtils.checkState(dataMetaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.RECORD_INDEX),
"Record index is not initialized in MDT");
- return dataCleanupManager.ensureDataCleanupOnException(v ->
- readIndexRecords(recordKeys,
MetadataPartitionType.RECORD_INDEX.getPartitionPath(), IDENTITY_ENCODING)
- .map(r -> r.getData().getRecordGlobalLocation())
- );
+ return dataCleanupManager.ensureDataCleanupOnException(v -> {
+ HoodieData<RecordIndexRawKey> rawKeys =
recordKeys.map(RecordIndexRawKey::new);
+ return readIndexRecords(rawKeys,
MetadataPartitionType.RECORD_INDEX.getPartitionPath())
+ .map(r -> r.getData().getRecordGlobalLocation());
+ });
}
/**
@@ -420,14 +355,14 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
* @param secondaryKeys The list of secondary keys to read
*/
@Override
- public HoodiePairData<String, HoodieRecordGlobalLocation>
readSecondaryIndex(HoodieData<String> secondaryKeys, String partitionName) {
+ public HoodiePairData<String, HoodieRecordGlobalLocation>
readSecondaryIndexLocationsWithKeys(HoodieData<String> secondaryKeys, String
partitionName) {
HoodieIndexVersion indexVersion =
existingIndexVersionOrDefault(partitionName, dataMetaClient);
return dataCleanupManager.ensureDataCleanupOnException(v -> {
if (indexVersion.equals(HoodieIndexVersion.V1)) {
- return readSecondaryIndexV1(secondaryKeys, partitionName);
+ return readSecondaryIndexLocationsWithKeysV1(secondaryKeys,
partitionName);
} else if (indexVersion.equals(HoodieIndexVersion.V2)) {
- return readSecondaryIndexV2(secondaryKeys, partitionName);
+ return readSecondaryIndexLocationsWithKeysV2(secondaryKeys,
partitionName);
} else {
throw new IllegalArgumentException("readSecondaryIndex does not
support index with version " + indexVersion);
}
@@ -447,9 +382,9 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
return dataCleanupManager.ensureDataCleanupOnException(v -> {
if (indexVersion.equals(HoodieIndexVersion.V1)) {
- return readSecondaryIndexV1(secondaryKeys, partitionName).values();
+ return readSecondaryIndexLocationsWithKeysV1(secondaryKeys,
partitionName).values();
} else if (indexVersion.equals(HoodieIndexVersion.V2)) {
- return
readRecordIndexLocations(getRecordKeysFromSecondaryKeysV2(secondaryKeys,
partitionName));
+ return
readRecordIndexLocations(readSecondaryIndexDataTableRecordKeysV2(secondaryKeys,
partitionName));
} else {
throw new IllegalArgumentException("readSecondaryIndexResult does not
support index with version " + indexVersion);
}
@@ -457,27 +392,25 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
}
@Override
- public HoodiePairData<String, HoodieRecord<HoodieMetadataPayload>>
getRecordsByKeys(
- HoodieData<String> keys, String partitionName,
SerializableFunctionUnchecked<String, String> keyEncodingFn) {
- List<FileSlice> fileSlices =
partitionFileSliceMap.computeIfAbsent(partitionName,
- k ->
HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient,
getMetadataFileSystemView(), partitionName));
- checkState(!fileSlices.isEmpty(), () -> "No file slices found for
partition: " + partitionName);
-
- return doLookup(keys, partitionName, fileSlices, keyEncodingFn);
+ public HoodiePairData<String, HoodieRecord<HoodieMetadataPayload>>
readIndexRecordsWithKeys(
+ HoodieData<? extends RawKey> rawKeys, String partitionName) {
+ return readIndexRecords(rawKeys, partitionName)
+ .mapToPair(record -> Pair.of(record.getRecordKey(), record));
}
- public HoodieData<String>
getRecordKeysFromSecondaryKeysV2(HoodieData<String> secondaryKeys, String
partitionName) {
- return dataCleanupManager.ensureDataCleanupOnException(v ->
- readIndexRecords(secondaryKeys, partitionName,
SecondaryIndexKeyUtils::escapeSpecialChars).map(
- hoodieRecord ->
SecondaryIndexKeyUtils.getRecordKeyFromSecondaryIndexKey(hoodieRecord.getRecordKey()))
- );
+ public HoodieData<String>
readSecondaryIndexDataTableRecordKeysV2(HoodieData<String> secondaryKeys,
String partitionName) {
+ return dataCleanupManager.ensureDataCleanupOnException(v -> {
+ HoodieData<SecondaryIndexPrefixRawKey> rawKeys =
secondaryKeys.map(SecondaryIndexPrefixRawKey::new);
+ return readIndexRecords(rawKeys, partitionName)
+ .map(hoodieRecord ->
SecondaryIndexKeyUtils.getRecordKeyFromSecondaryIndexKey(hoodieRecord.getRecordKey()));
+ });
}
- private HoodiePairData<String, HoodieRecordGlobalLocation>
readSecondaryIndexV2(HoodieData<String> secondaryKeys, String partitionName) {
- return readRecordIndex(getRecordKeysFromSecondaryKeysV2(secondaryKeys,
partitionName));
+ private HoodiePairData<String, HoodieRecordGlobalLocation>
readSecondaryIndexLocationsWithKeysV2(HoodieData<String> secondaryKeys, String
partitionName) {
+ return
readRecordIndexLocationsWithKeys(readSecondaryIndexDataTableRecordKeysV2(secondaryKeys,
partitionName));
}
- private HoodiePairData<String, HoodieRecordGlobalLocation>
readSecondaryIndexV1(HoodieData<String> secondaryKeys, String partitionName) {
+ private HoodiePairData<String, HoodieRecordGlobalLocation>
readSecondaryIndexLocationsWithKeysV1(HoodieData<String> secondaryKeys, String
partitionName) {
// For secondary index v1 we keep the old implementation.
ValidationUtils.checkState(secondaryKeys instanceof HoodieListData,
"readSecondaryIndex only support HoodieListData at the moment");
ValidationUtils.checkState(dataMetaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.RECORD_INDEX),
@@ -487,34 +420,34 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
() -> "Secondary index is not initialized in MDT for: " +
partitionName);
// Fetch secondary-index records
Map<String, Set<String>> secondaryKeyRecords =
HoodieDataUtils.collectPairDataAsMap(
-
getSecondaryIndexRecords(HoodieListData.eager(secondaryKeys.collectAsList()),
partitionName));
+
readSecondaryIndexDataTableRecordKeysWithKeys(HoodieListData.eager(secondaryKeys.collectAsList()),
partitionName));
// Now collect the record-keys and fetch the RLI records
List<String> recordKeys = new ArrayList<>();
secondaryKeyRecords.values().forEach(recordKeys::addAll);
- return readRecordIndex(HoodieListData.eager(recordKeys));
+ return readRecordIndexLocationsWithKeys(HoodieListData.eager(recordKeys));
}
- protected HoodieData<HoodieRecord<HoodieMetadataPayload>>
readIndexRecords(HoodieData<String> keys,
-
String partitionName,
-
SerializableFunctionUnchecked<String, String> keyEncodingFn) {
+ protected HoodieData<HoodieRecord<HoodieMetadataPayload>>
readIndexRecords(HoodieData<? extends RawKey> rawKeys,
+
String partitionName) {
List<FileSlice> fileSlices =
partitionFileSliceMap.computeIfAbsent(partitionName,
k ->
HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient,
getMetadataFileSystemView(), partitionName));
checkState(!fileSlices.isEmpty(), "No file slices found for partition: " +
partitionName);
- return doLookupIndexRecords(keys, partitionName, fileSlices,
keyEncodingFn);
+
+ // Convert RawKey to String using encode()
+ HoodieData<String> keys = rawKeys.map(key -> key.encode());
+ return lookupIndexRecords(keys, partitionName, fileSlices);
}
// When testing we noticed that the parallelism can be very low which hurts
the performance. so we should start with a reasonable
// level of parallelism in that case.
private HoodieData<String> repartitioningIfNeeded(
- HoodieData<String> keys, String partitionName, int numFileSlices,
SerializableBiFunction<String, Integer, Integer> mappingFunction,
- SerializableFunctionUnchecked<String, String> keyEncodingFn) {
+ HoodieData<String> keys, String partitionName, int numFileSlices,
SerializableBiFunction<String, Integer, Integer> mappingFunction) {
if (keys instanceof HoodieListData) {
int parallelism;
- parallelism = (int) keys.map(k ->
mappingFunction.apply(keyEncodingFn.apply(k),
numFileSlices)).distinct().count();
+ parallelism = (int) keys.map(k -> mappingFunction.apply(k,
numFileSlices)).distinct().count();
// In case of empty lookup set, we should avoid RDD with 0 partitions.
parallelism = Math.max(parallelism, 1);
- LOG.info("Repartitioning keys for partition {} from list data with
parallelism: {}",
- partitionName, parallelism);
+ LOG.info("Repartitioning keys for partition {} from list data with
parallelism: {}", partitionName, parallelism);
keys = getEngineContext().parallelize(keys.collectAsList(), parallelism);
} else if (keys.getNumPartitions() <
metadataConfig.getRepartitionMinPartitionsThreshold()) {
LOG.info("Repartitioning keys for partition {} to {} partitions",
partitionName, metadataConfig.getRepartitionDefaultPartitions());
@@ -599,21 +532,10 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
return FileGroupRecordBufferLoader.createReusable(readerContext);
}
- private HoodiePairData<String, HoodieRecord<HoodieMetadataPayload>>
lookupKeyRecordPairs(String partitionName,
+ private HoodieData<HoodieRecord<HoodieMetadataPayload>>
readSliceAndFilterByKeysIntoList(String partitionName,
List<String> sortedKeys,
-
FileSlice fileSlice) {
- Map<String, List<HoodieRecord<HoodieMetadataPayload>>> map = new
HashMap<>();
- try (ClosableIterator<Pair<String, HoodieRecord<HoodieMetadataPayload>>>
iterator =
- lookupKeyRecordPairsItr(partitionName, sortedKeys, fileSlice)) {
- iterator.forEachRemaining(entry -> map.put(entry.getKey(),
Collections.singletonList(entry.getValue())));
- }
- return HoodieListPairData.eager(map);
- }
-
- private HoodieData<HoodieRecord<HoodieMetadataPayload>> lookupRecords(String
partitionName,
-
List<String> sortedKeys,
-
FileSlice fileSlice,
-
boolean isFullKey) {
+
FileSlice fileSlice,
+
boolean isFullKey) {
List<HoodieRecord<HoodieMetadataPayload>> res = new ArrayList<>();
try (ClosableIterator<HoodieRecord<HoodieMetadataPayload>> iterator =
lookupRecordsItr(partitionName, sortedKeys, fileSlice, isFullKey)) {
iterator.forEachRemaining(res::add);
@@ -621,38 +543,41 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
return HoodieListData.lazy(res);
}
- private ClosableIterator<Pair<String, HoodieRecord<HoodieMetadataPayload>>>
lookupKeyRecordPairsItr(String partitionName,
-
List<String> sortedKeys,
-
FileSlice fileSlice) {
+ private ClosableIterator<Pair<String, HoodieRecord<HoodieMetadataPayload>>>
readSliceAndFilterByKeys(String partitionName,
+
List<String> sortedKeys,
+
FileSlice fileSlice) {
boolean isSecondaryIndex =
MetadataPartitionType.fromPartitionPath(partitionName).equals(MetadataPartitionType.SECONDARY_INDEX);
- return lookupRecords(partitionName, sortedKeys, fileSlice, metadataRecord
-> {
- HoodieMetadataPayload payload = new
HoodieMetadataPayload(Option.of(metadataRecord));
- String rowKey = payload.key != null ? payload.key :
metadataRecord.get(KEY_FIELD_NAME).toString();
- HoodieKey hoodieKey = new HoodieKey(rowKey, partitionName);
- return Pair.of(rowKey, new HoodieAvroRecord<>(hoodieKey, payload));
- }, !isSecondaryIndex);
+ return new CloseableFilterIterator<>(
+ readSliceAndFilterByKeysIntoList(partitionName, sortedKeys, fileSlice,
metadataRecord -> {
+ HoodieMetadataPayload payload = new
HoodieMetadataPayload(Option.of(metadataRecord));
+ String rowKey = payload.key != null ? payload.key :
metadataRecord.get(KEY_FIELD_NAME).toString();
+ HoodieKey hoodieKey = new HoodieKey(rowKey, partitionName);
+ return Pair.of(rowKey, new HoodieAvroRecord<>(hoodieKey, payload));
+ }, !isSecondaryIndex),
+ p -> !p.getValue().getData().isDeleted());
}
private ClosableIterator<HoodieRecord<HoodieMetadataPayload>>
lookupRecordsItr(String partitionName,
List<String> keys,
FileSlice fileSlice,
boolean isFullKey) {
- return lookupRecords(partitionName, keys, fileSlice,
- metadataRecord -> {
- HoodieMetadataPayload payload = new
HoodieMetadataPayload(Option.of(metadataRecord));
- return new HoodieAvroRecord<>(new HoodieKey(payload.key,
partitionName), payload);
- }, isFullKey);
+ return new CloseableFilterIterator<>(
+ readSliceAndFilterByKeysIntoList(partitionName, keys, fileSlice,
metadataRecord -> {
+ HoodieMetadataPayload payload = new
HoodieMetadataPayload(Option.of(metadataRecord));
+ return new HoodieAvroRecord<>(new HoodieKey(payload.key,
partitionName), payload);
+ }, isFullKey),
+ r -> !r.getData().isDeleted());
}
/**
* Lookup records and produce a lazy iterator of mapped HoodieRecords.
* @param isFullKey If true, perform exact key match. If false, perform
prefix match.
*/
- private <T> ClosableIterator<T> lookupRecords(String partitionName,
- List<String> sortedKeys,
- FileSlice fileSlice,
-
SerializableFunctionUnchecked<GenericRecord, T> transformer,
- boolean isFullKey) {
+ private <T> ClosableIterator<T> readSliceAndFilterByKeysIntoList(String
partitionName,
+
List<String> sortedKeys,
+ FileSlice
fileSlice,
+
SerializableFunctionUnchecked<GenericRecord, T> transformer,
+ boolean
isFullKey) {
// If no keys to lookup, we must return early, otherwise, the hfile lookup
will return all records.
if (sortedKeys.isEmpty()) {
return new EmptyIterator<>();
@@ -693,16 +618,6 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
.map(Literal::from)
.collect(Collectors.toList()));
} else {
- // For secondary index we need an extra step of processing the key to
lookup before handing it over to filegroup reader.
- if (MetadataPartitionType.fromPartitionPath(partitionName)
- .equals(MetadataPartitionType.SECONDARY_INDEX)) {
- // For secondary index, always use prefix matching
- return Predicates.startsWithAny(null,
- sortedKeys.stream()
- .map(escapedKey -> escapedKey +
SECONDARY_INDEX_RECORD_KEY_SEPARATOR)
- .map(Literal::from)
- .collect(Collectors.toList()));
- }
// For non-secondary index with prefix matching
return Predicates.startsWithAny(null, sortedKeys.stream()
.map(Literal::from)
@@ -843,37 +758,37 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
}
@Override
- public HoodiePairData<String, String>
getSecondaryIndexRecords(HoodieData<String> secondaryKeys, String
partitionName) {
+ public HoodiePairData<String, String>
readSecondaryIndexDataTableRecordKeysWithKeys(HoodieData<String> secondaryKeys,
String partitionName) {
HoodieIndexVersion indexVersion =
existingIndexVersionOrDefault(partitionName, dataMetaClient);
return dataCleanupManager.ensureDataCleanupOnException(v -> {
if (indexVersion.equals(HoodieIndexVersion.V1)) {
- return getSecondaryIndexRecordsV1(secondaryKeys, partitionName);
+ return readSecondaryIndexDataTableRecordKeysWithKeysV1(secondaryKeys,
partitionName);
} else if (indexVersion.equals(HoodieIndexVersion.V2)) {
- return getSecondaryIndexRecordsV2(secondaryKeys, partitionName);
+ return readSecondaryIndexDataTableRecordKeysWithKeysV2(secondaryKeys,
partitionName);
} else {
throw new IllegalArgumentException("getSecondaryIndexRecords does not
support index with version " + indexVersion);
}
});
}
- private HoodiePairData<String, String>
getSecondaryIndexRecordsV1(HoodieData<String> keys, String partitionName) {
+ private HoodiePairData<String, String>
readSecondaryIndexDataTableRecordKeysWithKeysV1(HoodieData<String> keys, String
partitionName) {
if (keys.isEmpty()) {
return HoodieListPairData.eager(Collections.emptyList());
}
- return getRecordsByKeyPrefixes(keys, partitionName, false,
SecondaryIndexKeyUtils::escapeSpecialChars)
- .filter(hoodieRecord -> !hoodieRecord.getData().isDeleted())
+ HoodieData<SecondaryIndexPrefixRawKey> rawKeys =
keys.map(SecondaryIndexPrefixRawKey::new);
+ return getRecordsByKeyPrefixes(rawKeys, partitionName, false)
.mapToPair(hoodieRecord ->
SecondaryIndexKeyUtils.getSecondaryKeyRecordKeyPair(hoodieRecord.getRecordKey()));
}
- private HoodiePairData<String, String>
getSecondaryIndexRecordsV2(HoodieData<String> secondaryKeys, String
partitionName) {
+ private HoodiePairData<String, String>
readSecondaryIndexDataTableRecordKeysWithKeysV2(HoodieData<String>
secondaryKeys, String partitionName) {
if (secondaryKeys.isEmpty()) {
return HoodieListPairData.eager(Collections.emptyList());
}
- return readIndexRecords(secondaryKeys, partitionName,
SecondaryIndexKeyUtils::escapeSpecialChars)
- .filter(hoodieRecord -> !hoodieRecord.getData().isDeleted())
+ HoodieData<SecondaryIndexPrefixRawKey> rawKeys =
secondaryKeys.map(SecondaryIndexPrefixRawKey::new);
+ return readIndexRecords(rawKeys, partitionName)
.mapToPair(hoodieRecord ->
SecondaryIndexKeyUtils.getSecondaryKeyRecordKeyPair(hoodieRecord.getRecordKey()));
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
index 8035d31af5c2..6323d13ffe1a 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
@@ -22,7 +22,6 @@ import org.apache.hudi.avro.model.HoodieMetadataColumnStats;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodiePairData;
-import org.apache.hudi.common.function.SerializableFunctionUnchecked;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -230,42 +229,43 @@ public interface HoodieTableMetadata extends
Serializable, AutoCloseable {
* Returns pairs of (record key, location of record key) which are found in
the record index.
* Records that are not found are ignored and wont be part of map object
that is returned.
*/
- HoodiePairData<String, HoodieRecordGlobalLocation>
readRecordIndex(HoodieData<String> recordKeys);
+ HoodiePairData<String, HoodieRecordGlobalLocation>
readRecordIndexLocationsWithKeys(HoodieData<String> recordKeys);
/**
* Returns the location of record keys which are found in the record index.
* Records that are not found are ignored and wont be part of map object
that is returned.
*/
default HoodieData<HoodieRecordGlobalLocation>
readRecordIndexLocations(HoodieData<String> recordKeys) {
- return readRecordIndex(recordKeys).values();
+ return readRecordIndexLocationsWithKeys(recordKeys).values();
}
/**
* Returns pairs of (secondary key, location of secondary key) which the
provided secondary keys maps to.
* Records that are not found are ignored and won't be part of map object
that is returned.
*/
- HoodiePairData<String, HoodieRecordGlobalLocation>
readSecondaryIndex(HoodieData<String> secondaryKeys, String partitionName);
+ HoodiePairData<String, HoodieRecordGlobalLocation>
readSecondaryIndexLocationsWithKeys(HoodieData<String> secondaryKeys, String
partitionName);
/**
* Returns the location of secondary keys which are found in the secondary
index.
* Records that are not found are ignored and won't be part of map object
that is returned.
*/
default HoodieData<HoodieRecordGlobalLocation>
readSecondaryIndexLocations(HoodieData<String> secondaryKeys, String
partitionName) {
- return readSecondaryIndex(secondaryKeys, partitionName).values();
+ return readSecondaryIndexLocationsWithKeys(secondaryKeys,
partitionName).values();
}
/**
- * Fetch records by key prefixes. Key prefix passed is expected to match the
same prefix as stored in Metadata table partitions. For eg, in case of col
stats partition,
- * actual keys in metadata partition is encoded values of column name,
partition name and file name. So, key prefixes passed to this method is
expected to be encoded already.
+ * Fetch records by key prefixes. The raw keys are encoded using their
encode() method to generate
+ * the actual key prefixes used for lookup in the metadata table partitions.
*
- * @param keyPrefixes list of key prefixes for which interested records
are looked up for.
- * @param partitionName partition name in metadata table where the records
are looked up for.
- * @return {@link HoodieData} of {@link HoodieRecord}s with records matching
the passed in key prefixes.
+ * @param rawKeys list of raw key objects to be encoded into key
prefixes
+ * @param partitionName partition name in metadata table where the
records are looked up for
+ * @param shouldLoadInMemory whether to load records in memory
+ * @return {@link HoodieData} of {@link HoodieRecord}s with records matching
the encoded key prefixes
*/
- HoodieData<HoodieRecord<HoodieMetadataPayload>>
getRecordsByKeyPrefixes(HoodieData<String> keyPrefixes,
-
String partitionName,
-
boolean shouldLoadInMemory,
-
SerializableFunctionUnchecked<String, String> keyEncoder);
+ HoodieData<HoodieRecord<HoodieMetadataPayload>> getRecordsByKeyPrefixes(
+ HoodieData<? extends RawKey> rawKeys,
+ String partitionName,
+ boolean shouldLoadInMemory);
/**
* Get the instant time to which the metadata is synced w.r.t data timeline.
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 144f2a7be44a..6fef6d3febeb 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -55,7 +55,6 @@ import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.engine.ReaderContextFactory;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.function.SerializableBiFunction;
-import org.apache.hudi.common.function.SerializableFunctionUnchecked;
import org.apache.hudi.common.function.SerializablePairFunction;
import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
import org.apache.hudi.common.model.FileSlice;
@@ -210,7 +209,6 @@ public class HoodieTableMetadataUtil {
public static final String PARTITION_NAME_EXPRESSION_INDEX_PREFIX =
"expr_index_";
public static final String PARTITION_NAME_SECONDARY_INDEX =
"secondary_index";
public static final String PARTITION_NAME_SECONDARY_INDEX_PREFIX =
"secondary_index_";
- public static final SerializableFunctionUnchecked<String, String>
IDENTITY_ENCODING = key -> key;
private static final Set<Schema.Type> SUPPORTED_TYPES_PARTITION_STATS = new
HashSet<>(Arrays.asList(
Schema.Type.INT, Schema.Type.LONG, Schema.Type.FLOAT,
Schema.Type.DOUBLE, Schema.Type.STRING, Schema.Type.BOOLEAN, Schema.Type.NULL,
Schema.Type.BYTES));
@@ -1366,7 +1364,7 @@ public class HoodieTableMetadataUtil {
public static SerializableBiFunction<String, Integer, Integer>
getSecondaryKeyToFileGroupMappingFunction(boolean needsSecondaryKeyExtraction) {
if (needsSecondaryKeyExtraction) {
return (recordKey, numFileGroups) -> {
- String secondaryKey =
SecondaryIndexKeyUtils.getUnescapedSecondaryKeyFromSecondaryIndexKey(recordKey);
+ String secondaryKey =
SecondaryIndexKeyUtils.getUnescapedSecondaryKeyPrefixFromSecondaryIndexKey(recordKey);
return mapRecordKeyToFileGroupIndex(secondaryKey, numFileGroups);
};
}
@@ -2716,7 +2714,8 @@ public class HoodieTableMetadataUtil {
// Fetch metadata table COLUMN_STATS partition records for above
files
List<HoodieColumnRangeMetadata<Comparable>> partitionColumnMetadata
= tableMetadata
.getRecordsByKeyPrefixes(
- HoodieListData.lazy(generateKeyPrefixes(colsToIndex,
partitionName)), MetadataPartitionType.COLUMN_STATS.getPartitionPath(), false,
IDENTITY_ENCODING)
+ HoodieListData.lazy(generateColumnStatsKeys(colsToIndex,
partitionName)),
+ MetadataPartitionType.COLUMN_STATS.getPartitionPath(), false)
// schema and properties are ignored in getInsertValue, so
simply pass as null
.map(record ->
((HoodieMetadataPayload)record.getData()).getColumnStatMetadata())
.filter(Option::isPresent)
@@ -2767,18 +2766,25 @@ public class HoodieTableMetadataUtil {
/**
* Generate key prefixes for each combination of column name in {@param
columnsToIndex} and {@param partitionName}.
+ * @deprecated Use {@link #generateColumnStatsKeys} instead
*/
+ @Deprecated
public static List<String> generateKeyPrefixes(List<String> columnsToIndex,
String partitionName) {
- List<String> keyPrefixes = new ArrayList<>();
- PartitionIndexID partitionIndexId = new
PartitionIndexID(getColumnStatsIndexPartitionIdentifier(partitionName));
+ List<ColumnStatsIndexPrefixRawKey> rawKeys =
generateColumnStatsKeys(columnsToIndex, partitionName);
+ return rawKeys.stream()
+ .map(key -> key.encode())
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Generate column stats index keys for each combination of column name in
{@param columnsToIndex} and {@param partitionName}.
+ */
+ public static List<ColumnStatsIndexPrefixRawKey>
generateColumnStatsKeys(List<String> columnsToIndex, String partitionName) {
+ List<ColumnStatsIndexPrefixRawKey> keys = new ArrayList<>();
for (String columnName : columnsToIndex) {
- ColumnIndexID columnIndexID = new ColumnIndexID(columnName);
- String keyPrefix = columnIndexID.asBase64EncodedString()
- .concat(partitionIndexId.asBase64EncodedString());
- keyPrefixes.add(keyPrefix);
+ keys.add(new ColumnStatsIndexPrefixRawKey(columnName, partitionName));
}
-
- return keyPrefixes;
+ return keys;
}
private static List<HoodieColumnRangeMetadata<Comparable>>
translateWriteStatToFileStats(HoodieWriteStat writeStat,
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/RawKey.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/RawKey.java
new file mode 100644
index 000000000000..0d58727bf11d
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/RawKey.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import java.io.Serializable;
+
+/**
+ * Interface for raw keys that can be encoded for storage in the metadata
table.
+ */
+public interface RawKey extends Serializable {
+
+ /**
+ * Encode this raw key into a string for storage.
+ * @return The encoded string key
+ */
+ String encode();
+}
\ No newline at end of file
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/RecordIndexRawKey.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/RecordIndexRawKey.java
new file mode 100644
index 000000000000..23e9d127a96c
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/RecordIndexRawKey.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import java.util.Objects;
+
+/**
+ * Represents a record index key that requires no encoding (identity encoding).
+ */
+public class RecordIndexRawKey implements RawKey {
+ private final String recordKey;
+
+ public RecordIndexRawKey(String recordKey) {
+ this.recordKey = Objects.requireNonNull(recordKey);
+ }
+
+ @Override
+ public String encode() {
+ // Identity encoding - return the key as-is
+ return recordKey;
+ }
+
+ public String getRecordKey() {
+ return recordKey;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ RecordIndexRawKey that = (RecordIndexRawKey) o;
+ return Objects.equals(recordKey, that.recordKey);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(recordKey);
+ }
+
+ @Override
+ public String toString() {
+ return "RecordIndexRawKey{" + "recordKey='" + recordKey + '\'' + '}';
+ }
+}
\ No newline at end of file
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexKeyUtils.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexKeyUtils.java
index e93614e719ea..2b68fba85e3d 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexKeyUtils.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexKeyUtils.java
@@ -31,68 +31,55 @@ public class SecondaryIndexKeyUtils {
// Escape character
public static final char ESCAPE_CHAR = '\\';
- /**
- * Use this function if you want to get both record key and secondary key.
- *
- * @returns pair of secondary key, record key.
- * */
+ // Give "<encoded secondaryKey>$<encoded primaryKey>"
+ // Extract Pair<<secondaryKey>,<primaryKey>>
public static Pair<String, String> getSecondaryKeyRecordKeyPair(String
secIdxRecKey) {
int delimiterIndex = getSecondaryIndexKeySeparatorPosition(secIdxRecKey);
return Pair.of(unescapeSpecialChars(secIdxRecKey.substring(0,
delimiterIndex)), unescapeSpecialChars(secIdxRecKey.substring(delimiterIndex +
1)));
}
- /**
- * Use this function if you want to get both record key and secondary key.
- *
- * @returns pair of secondary key, record key.
- * */
+ // Give "<encoded secondaryKey>$<encoded primaryKey>"
+ // Extract Pair<<primaryKey>,<secondaryKey>>
public static Pair<String, String> getRecordKeySecondaryKeyPair(String
secIdxRecKey) {
int delimiterIndex = getSecondaryIndexKeySeparatorPosition(secIdxRecKey);
return Pair.of(unescapeSpecialChars(secIdxRecKey.substring(delimiterIndex
+ 1)), unescapeSpecialChars(secIdxRecKey.substring(0, delimiterIndex)));
}
- /**
- * Extracts the record key portion from an encoded secondary index key.
- *
- * @param secIdxRecKey the encoded key in the form
"escapedSecondaryKey$escapedRecordKey"
- * @return the unescaped record key, or {@code null} if the record key was
{@code null}
- * @throws IllegalStateException if the key format is invalid (i.e., no
unescaped separator found)
- */
+ // Give "<encoded secondaryKey>$<encoded primaryKey>"
+ // Extract <primaryKey>
public static String getRecordKeyFromSecondaryIndexKey(String secIdxRecKey) {
- // the payload key is in the format of "secondaryKey$primaryKey"
- // we need to extract the primary key from the payload key
int delimiterIndex = getSecondaryIndexKeySeparatorPosition(secIdxRecKey);
return unescapeSpecialChars(secIdxRecKey.substring(delimiterIndex + 1));
}
- /**
- * Extracts the secondary key portion from an encoded secondary index key.
- *
- * @param secIdxRecKey the encoded key in the form
"escapedSecondaryKey$escapedRecordKey"
- * @return the unescaped secondary key, or {@code null} if the secondary key
was {@code null}
- * @throws IllegalStateException if the key format is invalid (i.e., no
unescaped separator found)
- */
+ // Give "<encoded secondaryKey>$<encoded primaryKey>"
+ // Extract <secondaryKey>
public static String getSecondaryKeyFromSecondaryIndexKey(String
secIdxRecKey) {
- // the payload key is in the format of "secondaryKey$primaryKey"
- // we need to extract the secondary key from the payload key
return
unescapeSpecialChars(getUnescapedSecondaryKeyFromSecondaryIndexKey(secIdxRecKey));
}
+ // Give "<encoded secondaryKey>$<encoded primaryKey>"
+ // Extract "<encoded secondaryKey>$"
+ public static String
getUnescapedSecondaryKeyPrefixFromSecondaryIndexKey(String secIdxRecKey) {
+ int delimiterIndex = getSecondaryIndexKeySeparatorPosition(secIdxRecKey);
+ return secIdxRecKey.substring(0, delimiterIndex + 1);
+ }
+
+ // Give <secondaryKey>
+ // Extract "<encoded secondaryKey>$"
+ public static String getEscapedSecondaryKeyPrefixFromSecondaryKey(String
secKey) {
+ return String.format("%s%s", escapeSpecialChars(secKey),
SECONDARY_INDEX_RECORD_KEY_SEPARATOR_CHAR);
+ }
+
+ // Give "<encoded secondaryKey>$<encoded primaryKey>"
+ // Extract "<encoded secondaryKey>"
public static String getUnescapedSecondaryKeyFromSecondaryIndexKey(String
secIdxRecKey) {
- // the payload key is in the format of "secondaryKey$primaryKey"
- // we need to extract the secondary key from the payload key
int delimiterIndex = getSecondaryIndexKeySeparatorPosition(secIdxRecKey);
return secIdxRecKey.substring(0, delimiterIndex);
}
- /**
- * Constructs an encoded secondary index key by escaping the given secondary
and record keys,
- * and concatenating them with the separator {@code "$"}.
- *
- * @param unescapedSecKey the secondary key (can be {@code null})
- * @param unescapedRecordKey the record key (can be {@code null})
- * @return a string representing the encoded secondary index key
- */
+ // give <secondaryKey> and <primaryKey>
+ // construct "<encoded secondaryKey>$<encoded primaryKey>"
public static String constructSecondaryIndexKey(String unescapedSecKey,
String unescapedRecordKey) {
return escapeSpecialChars(unescapedSecKey) +
SECONDARY_INDEX_RECORD_KEY_SEPARATOR + escapeSpecialChars(unescapedRecordKey);
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexPrefixRawKey.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexPrefixRawKey.java
new file mode 100644
index 000000000000..fc442e07e0fe
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexPrefixRawKey.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import java.util.Objects;
+
+/**
+ * Represents a secondary index key, whose raw content is the column value of
the data table.
+ */
+public class SecondaryIndexPrefixRawKey implements RawKey {
+ private final String secondaryKey;
+
+ public SecondaryIndexPrefixRawKey(String secondaryKey) {
+ this.secondaryKey = secondaryKey;
+ }
+
+ @Override
+ public String encode() {
+ return
SecondaryIndexKeyUtils.getEscapedSecondaryKeyPrefixFromSecondaryKey(secondaryKey);
+ }
+
+ public String getSecondaryKey() {
+ return secondaryKey;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ SecondaryIndexPrefixRawKey that = (SecondaryIndexPrefixRawKey) o;
+ return Objects.equals(secondaryKey, that.secondaryKey);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(secondaryKey);
+ }
+
+ @Override
+ public String toString() {
+ return "SecondaryIndexKey{" + "secondaryKey='" + secondaryKey + '\'' + '}';
+ }
+}
\ No newline at end of file
diff --git
a/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieBackedTableMetadataDataCleanup.java
b/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieBackedTableMetadataDataCleanup.java
index c00ba7ac1556..dea2011f4a15 100644
---
a/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieBackedTableMetadataDataCleanup.java
+++
b/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieBackedTableMetadataDataCleanup.java
@@ -98,14 +98,14 @@ public class TestHoodieBackedTableMetadataDataCleanup {
HoodieData<String> recordKeys = HoodieListData.eager(Arrays.asList("key1",
"key2"));
// Setup mock behavior
- when(mockMetadata.getRecordsByKeys(any(), any(),
any())).thenReturn(mockPairData);
+ when(mockMetadata.readIndexRecordsWithKeys(any(),
any())).thenReturn(mockPairData);
when(mockPairData.mapToPair(any())).thenReturn(mockResult);
// Call real method on the mock
- when(mockMetadata.readRecordIndex(recordKeys)).thenCallRealMethod();
+
when(mockMetadata.readRecordIndexLocationsWithKeys(recordKeys)).thenCallRealMethod();
// Execute the method
- HoodiePairData result = mockMetadata.readRecordIndex(recordKeys);
+ HoodiePairData result =
mockMetadata.readRecordIndexLocationsWithKeys(recordKeys);
// Verify cleanup manager was invoked
verify(spyCleanupManager).ensureDataCleanupOnException(any());
@@ -124,7 +124,7 @@ public class TestHoodieBackedTableMetadataDataCleanup {
// Setup mock behavior for readIndexRecords
HoodieData mockIndexRecords = mock(HoodieData.class);
- when(mockMetadata.readIndexRecords(any(), anyString(),
any())).thenReturn(mockIndexRecords);
+ when(mockMetadata.readIndexRecords(any(),
anyString())).thenReturn(mockIndexRecords);
when(mockIndexRecords.map(any())).thenReturn(mockHoodieData);
// Call real method on the mock
@@ -155,15 +155,15 @@ public class TestHoodieBackedTableMetadataDataCleanup {
.thenReturn(HoodieIndexVersion.V2);
// Setup mock behavior for V2 path
- when(mockMetadata.getRecordKeysFromSecondaryKeysV2(any(),
anyString())).thenReturn(mockHoodieData);
+ when(mockMetadata.readSecondaryIndexDataTableRecordKeysV2(any(),
anyString())).thenReturn(mockHoodieData);
when(mockPairData.mapToPair(any())).thenReturn(mockResult);
// Call real method on the mock
- when(mockMetadata.readSecondaryIndex(secondaryKeys,
partitionName)).thenCallRealMethod();
+ when(mockMetadata.readSecondaryIndexLocationsWithKeys(secondaryKeys,
partitionName)).thenCallRealMethod();
// Execute the method - it may throw NPE due to mocks, but we just want
to verify cleanup manager is called
try {
- mockMetadata.readSecondaryIndex(secondaryKeys, partitionName);
+ mockMetadata.readSecondaryIndexLocationsWithKeys(secondaryKeys,
partitionName);
} catch (Exception e) {
// Expected - we're testing with mocks
}
@@ -189,7 +189,7 @@ public class TestHoodieBackedTableMetadataDataCleanup {
// Setup mock behavior for V2 path
when(mockPairData.values()).thenReturn(mockHoodieData);
- when(mockMetadata.getRecordKeysFromSecondaryKeysV2(any(),
anyString())).thenReturn(mockHoodieData);
+ when(mockMetadata.readSecondaryIndexDataTableRecordKeysV2(any(),
anyString())).thenReturn(mockHoodieData);
when(mockMetadata.readRecordIndexLocations(any())).thenReturn(mockHoodieData);
// Call real method on the mock
@@ -221,12 +221,12 @@ public class TestHoodieBackedTableMetadataDataCleanup {
doThrow(testException).when(mockCleanupManager).ensureDataCleanupOnException(any());
// Call real method on the mock
- when(mockMetadata.readRecordIndex(any())).thenCallRealMethod();
+
when(mockMetadata.readRecordIndexLocationsWithKeys(any())).thenCallRealMethod();
// Execute and verify exception is propagated
HoodieData<String> recordKeys =
HoodieListData.eager(Arrays.asList("key1"));
try {
- mockMetadata.readRecordIndex(recordKeys);
+ mockMetadata.readRecordIndexLocationsWithKeys(recordKeys);
fail("Expected exception was not thrown");
} catch (HoodieException e) {
assertEquals("Test exception from cleanup manager", e.getMessage());
diff --git
a/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
b/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
index 5900fdafbdde..b864fc29abf5 100644
---
a/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
+++
b/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
@@ -23,38 +23,31 @@ import
org.apache.hudi.common.function.SerializableBiFunction;
import org.junit.jupiter.api.Test;
import static
org.apache.hudi.metadata.SecondaryIndexKeyUtils.constructSecondaryIndexKey;
-import static
org.apache.hudi.metadata.SecondaryIndexKeyUtils.escapeSpecialChars;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotEquals;
public class TestHoodieTableMetadataUtil {
@Test
public void testGetRecordKeyToFileGroupIndexFunction() {
- // Test with secondary key format
- String compositeKey = "secondaryKey$recordKey";
+ int numFileGroups = 10;
+ String recordKey = "recordKey$";
+ String secondaryKey = "secondaryKey$";
+ // Raw key used for read path
+ SecondaryIndexPrefixRawKey rawKey1 = new
SecondaryIndexPrefixRawKey(secondaryKey);
+ // Composite key used for write path
+ String compositeKey = constructSecondaryIndexKey(secondaryKey, recordKey);
SerializableBiFunction<String, Integer, Integer> hashOnSecKeyOnly =
HoodieTableMetadataUtil.getSecondaryKeyToFileGroupMappingFunction(true);
SerializableBiFunction<String, Integer, Integer> hashOnFullKey =
HoodieTableMetadataUtil.getSecondaryKeyToFileGroupMappingFunction(false);
- int result1 = hashOnSecKeyOnly.apply(compositeKey, 10);
- int result2 =
hashOnSecKeyOnly.apply("anotherSecondaryKey$anotherRecordKey", 10);
+ // On write path we use hashOnSecKeyOnly
+ int result1 = hashOnSecKeyOnly.apply(compositeKey, numFileGroups);
+ // On read path, we use hashOnFullKey
+ int result2 = hashOnFullKey.apply(rawKey1.encode(), numFileGroups);
- // Both should hash the secondary key portion
- assertNotEquals(result1, result2);
-
- // Test with regular key format
- int result3 = hashOnFullKey.apply("simpleKey", 10);
- int result4 = hashOnFullKey.apply("anotherSimpleKey", 10);
-
- // Both should hash the full key
- assertNotEquals(result3, result4);
-
- // Hash on Sec key only <=> Hash full key if key equals to UNESCAPED sec
key
- assertEquals(hashOnSecKeyOnly.apply("secKey$recKey", 10),
hashOnFullKey.apply("secKey", 10));
- assertEquals(hashOnSecKeyOnly.apply(constructSecondaryIndexKey("seckey",
"reckey"), 10), hashOnFullKey.apply("seckey", 10));
- assertEquals(hashOnSecKeyOnly.apply(constructSecondaryIndexKey("$",
"reckey"), 10), hashOnFullKey.apply(escapeSpecialChars("$"), 10));
+ // Both should hash the secondary key portion so read and write paths are
consistent.
+ assertEquals(result1, result2);
}
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/metadata/TestSecondaryIndexKeyUtils.java
b/hudi-common/src/test/java/org/apache/hudi/metadata/TestSecondaryIndexKeyUtils.java
index 1ace3e6fa6a4..b9747306719d 100644
---
a/hudi-common/src/test/java/org/apache/hudi/metadata/TestSecondaryIndexKeyUtils.java
+++
b/hudi-common/src/test/java/org/apache/hudi/metadata/TestSecondaryIndexKeyUtils.java
@@ -27,6 +27,7 @@ import org.junit.jupiter.params.provider.ValueSource;
import java.util.stream.Stream;
+import static
org.apache.hudi.metadata.SecondaryIndexKeyUtils.getUnescapedSecondaryKeyPrefixFromSecondaryIndexKey;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -201,8 +202,10 @@ public class TestSecondaryIndexKeyUtils {
@ParameterizedTest(name = "Key construction round-trip: secondaryKey='{0}',
recordKey='{1}'")
@MethodSource("keyConstructionRoundTripTestCases")
public void testKeyConstructionRoundTrip(String secondaryKey, String
recordKey) {
- // Construct the key
+ // Construct the key used by the writer path
String constructedKey =
SecondaryIndexKeyUtils.constructSecondaryIndexKey(secondaryKey, recordKey);
+ // The key used by the reader path and the key used by the writer path
have the following invariant.
+ assertEquals(new SecondaryIndexPrefixRawKey(secondaryKey).encode(),
getUnescapedSecondaryKeyPrefixFromSecondaryIndexKey(constructedKey));
// Extract both parts
String extractedSecondary =
SecondaryIndexKeyUtils.getSecondaryKeyFromSecondaryIndexKey(constructedKey);
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/FileStatsIndex.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/FileStatsIndex.java
index 0c1c296feab4..06b92ba229d4 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/FileStatsIndex.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/FileStatsIndex.java
@@ -28,8 +28,8 @@ import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.common.util.collection.Tuple3;
-import org.apache.hudi.common.util.hash.ColumnIndexID;
import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.metadata.ColumnStatsIndexPrefixRawKey;
import org.apache.hudi.metadata.HoodieMetadataPayload;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
@@ -63,7 +63,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import static org.apache.hudi.common.util.ValidationUtils.checkState;
-import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.IDENTITY_ENCODING;
import static
org.apache.hudi.source.stats.ColumnStatsSchemas.COL_STATS_DATA_TYPE;
import static
org.apache.hudi.source.stats.ColumnStatsSchemas.COL_STATS_TARGET_POS;
import static
org.apache.hudi.source.stats.ColumnStatsSchemas.METADATA_DATA_TYPE;
@@ -389,15 +388,15 @@ public class FileStatsIndex implements ColumnStatsIndex {
"Column stats is only valid when push down filters have referenced
columns");
// Read Metadata Table's column stats Flink's RowData list by
- // - Fetching the records by key-prefixes (encoded column names)
+ // - Fetching the records by key-prefixes (column names)
// - Deserializing fetched records into [[RowData]]s
- // TODO encoding should be done internally w/in HoodieBackedTableMetadata
- List<String> encodedTargetColumnNames = Arrays.stream(targetColumns)
- .map(colName -> new
ColumnIndexID(colName).asBase64EncodedString()).collect(Collectors.toList());
+ List<ColumnStatsIndexPrefixRawKey> rawKeys = Arrays.stream(targetColumns)
+ .map(ColumnStatsIndexPrefixRawKey::new) // Just column name, no
partition
+ .collect(Collectors.toList());
HoodieData<HoodieRecord<HoodieMetadataPayload>> records =
getMetadataTable().getRecordsByKeyPrefixes(
- HoodieListData.lazy(encodedTargetColumnNames),
getIndexPartitionName(), false, IDENTITY_ENCODING);
+ HoodieListData.lazy(rawKeys), getIndexPartitionName(), false);
org.apache.hudi.util.AvroToRowDataConverters.AvroToRowDataConverter
converter =
AvroToRowDataConverters.createRowConverter((RowType)
METADATA_DATA_TYPE.getLogicalType());
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
index 39edfd2c52f6..fe8bf929a0fc 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
@@ -758,29 +758,29 @@ public class TestHoodieTableMetadataUtil extends
HoodieCommonTestHarness {
// Test case 2: Secondary index record key with version >= 2
Arguments.of(
"Secondary index record key with version >= 2",
- "primary_key$secondary_key",
+ "secondary_key$primary_key",
10,
"secondary_index_idx_ts",
V2,
- 6 // Uses secondary key portion for hashing
+ 8 // Uses secondary key portion for hashing
),
// Test case 3: Secondary index record key but version < 2
Arguments.of(
"Secondary index record key but version < 2",
- "primary_key$secondary_key",
+ "secondary_key$primary_key",
10,
"secondary_index_idx_ts",
HoodieIndexVersion.V1,
- 4 // Uses full key for hashing
+ 0 // Uses full key for hashing
),
// Test case 4: Secondary index record key but not in secondary index
partition
Arguments.of(
"Secondary index record key but not in secondary index partition",
- "primary_key$secondary_key",
+ "secondary_key$primary_key",
10,
"files",
HoodieIndexVersion.V1,
- 4 // Uses full key for hashing since not in secondary index
partition
+ 0 // Uses full key for hashing since not in secondary index
partition
),
// Test case 7: Single file group
Arguments.of(
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
index 204a50ca2a03..0a1aed48efaf 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
@@ -31,7 +31,7 @@ import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.common.util.collection
import org.apache.hudi.common.util.hash.{ColumnIndexID, PartitionIndexID}
import org.apache.hudi.data.HoodieJavaRDD
-import org.apache.hudi.metadata.{HoodieMetadataPayload, HoodieTableMetadata,
HoodieTableMetadataUtil, MetadataPartitionType}
+import org.apache.hudi.metadata.{ColumnStatsIndexPrefixRawKey,
HoodieMetadataPayload, HoodieTableMetadata, HoodieTableMetadataUtil,
MetadataPartitionType}
import
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS
import org.apache.hudi.util.JFunction
@@ -342,23 +342,19 @@ class ColumnStatsIndexSupport(spark: SparkSession,
// - Filtering out nulls
checkState(targetColumns.nonEmpty)
- // TODO encoding should be done internally w/in HoodieBackedTableMetadata
- val encodedTargetColumnNames = targetColumns.map(colName => new
ColumnIndexID(colName).asBase64EncodedString())
- // encode column name and parition name if partition list is available
- val keyPrefixes = if (prunedPartitions.isDefined) {
- prunedPartitions.get.map(partitionPath =>
- new
PartitionIndexID(HoodieTableMetadataUtil.getPartitionIdentifier(partitionPath)).asBase64EncodedString()
- ).flatMap(encodedPartition => {
- encodedTargetColumnNames.map(encodedTargetColumn =>
encodedTargetColumn.concat(encodedPartition))
- })
+ // Create raw key prefixes based on column names and optional partition
names
+ val rawKeys = if (prunedPartitions.isDefined) {
+ val partitionsList = prunedPartitions.get.toList
+ targetColumns.flatMap(colName =>
+ partitionsList.map(partitionPath => new
ColumnStatsIndexPrefixRawKey(colName, partitionPath))
+ )
} else {
- encodedTargetColumnNames
+ targetColumns.map(colName => new ColumnStatsIndexPrefixRawKey(colName))
}
val metadataRecords: HoodieData[HoodieRecord[HoodieMetadataPayload]] =
metadataTable.getRecordsByKeyPrefixes(
- HoodieListData.eager(keyPrefixes.toSeq.asJava),
HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS, shouldReadInMemory,
- HoodieTableMetadataUtil.IDENTITY_ENCODING)
+ HoodieListData.eager(rawKeys.asJava),
HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS, shouldReadInMemory)
val columnStatsRecords: HoodieData[HoodieMetadataColumnStats] =
//TODO: [HUDI-8303] Explicit conversion might not be required for Scala
2.12+
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ExpressionIndexSupport.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ExpressionIndexSupport.scala
index f81bda62ac38..e4544c82e70e 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ExpressionIndexSupport.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ExpressionIndexSupport.scala
@@ -35,7 +35,7 @@ import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.common.util.hash.{ColumnIndexID, PartitionIndexID}
import org.apache.hudi.data.HoodieJavaRDD
import org.apache.hudi.index.expression.HoodieExpressionIndex
-import org.apache.hudi.metadata.{HoodieMetadataPayload,
HoodieTableMetadataUtil, MetadataPartitionType}
+import org.apache.hudi.metadata.{ColumnStatsIndexPrefixRawKey,
HoodieMetadataPayload, HoodieTableMetadataUtil, MetadataPartitionType}
import
org.apache.hudi.metadata.HoodieTableMetadataUtil.getPartitionStatsIndexKey
import org.apache.hudi.util.JFunction
@@ -344,23 +344,19 @@ class ExpressionIndexSupport(spark: SparkSession,
// - Filtering out nulls
checkState(targetColumns.nonEmpty)
- // TODO encoding should be done internally w/in HoodieBackedTableMetadata
- val encodedTargetColumnNames = targetColumns.map(colName => new
ColumnIndexID(colName).asBase64EncodedString())
- // encode column name and parition name if partition list is available
- val keyPrefixes = if (prunedPartitions.isDefined) {
- prunedPartitions.get.map(partitionPath =>
- new
PartitionIndexID(HoodieTableMetadataUtil.getPartitionIdentifier(partitionPath)).asBase64EncodedString()
- ).flatMap(encodedPartition => {
- encodedTargetColumnNames.map(encodedTargetColumn =>
encodedTargetColumn.concat(encodedPartition))
- })
+ // Create raw key prefixes based on column names and optional partition
names
+ val rawKeys = if (prunedPartitions.isDefined) {
+ val partitionsList = prunedPartitions.get.toList
+ targetColumns.flatMap(colName =>
+ partitionsList.map(partitionPath => new
ColumnStatsIndexPrefixRawKey(colName, partitionPath))
+ )
} else {
- encodedTargetColumnNames
+ targetColumns.map(colName => new ColumnStatsIndexPrefixRawKey(colName))
}
val metadataRecords: HoodieData[HoodieRecord[HoodieMetadataPayload]] =
metadataTable.getRecordsByKeyPrefixes(
- HoodieListData.eager(keyPrefixes.toSeq.asJava),
HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS, shouldReadInMemory,
- HoodieTableMetadataUtil.IDENTITY_ENCODING)
+ HoodieListData.eager(rawKeys.asJava),
HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS, shouldReadInMemory)
val columnStatsRecords: HoodieData[HoodieMetadataColumnStats] =
//TODO: [HUDI-8303] Explicit conversion might not be required for Scala
2.12+
@@ -504,9 +500,11 @@ class ExpressionIndexSupport(spark: SparkSession,
private def loadExpressionIndexPartitionStatRecords(indexDefinition:
HoodieIndexDefinition, shouldReadInMemory: Boolean):
HoodieData[HoodieMetadataColumnStats] = {
// We are omitting the partition name and only using the column name and
expression index partition stat prefix to fetch the records
- val recordKeyPrefix =
getPartitionStatsIndexKey(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_PARTITION_STAT_PREFIX,
indexDefinition.getSourceFields.get(0))
+ // Create a ColumnStatsIndexKey with the column and partition prefix
+ val rawKey = new
ColumnStatsIndexPrefixRawKey(indexDefinition.getSourceFields.get(0),
+
toJavaOption(Option(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_PARTITION_STAT_PREFIX)))
val colStatsRecords: HoodieData[HoodieMetadataColumnStats] =
loadExpressionIndexForColumnsInternal(
- indexDefinition.getIndexName, shouldReadInMemory, Seq(recordKeyPrefix))
+ indexDefinition.getIndexName, shouldReadInMemory, Seq(rawKey))
//TODO: [HUDI-8303] Explicit conversion might not be required for Scala
2.12+
colStatsRecords
}
@@ -543,28 +541,28 @@ class ExpressionIndexSupport(spark: SparkSession,
indexPartition: String,
shouldReadInMemory:
Boolean): HoodieData[HoodieMetadataColumnStats] = {
// Read Metadata Table's Expression Index records into [[HoodieData]]
container by
- // - Fetching the records from CSI by key-prefixes (encoded column
names)
+ // - Fetching the records from CSI by key-prefixes (column names)
// - Extracting [[HoodieMetadataColumnStats]] records
// - Filtering out nulls
checkState(targetColumns.nonEmpty)
- val encodedTargetColumnNames = targetColumns.map(colName => new
ColumnIndexID(colName).asBase64EncodedString())
- val keyPrefixes = if (prunedPartitions.nonEmpty) {
- prunedPartitions.map(partitionPath =>
- new
PartitionIndexID(HoodieTableMetadataUtil.getPartitionIdentifier(partitionPath)).asBase64EncodedString()
- ).flatMap(encodedPartition => {
- encodedTargetColumnNames.map(encodedTargetColumn =>
encodedTargetColumn.concat(encodedPartition))
- })
+
+ // Create raw key prefixes based on column names and optional partition
names
+ val rawKeys = if (prunedPartitions.nonEmpty) {
+ val partitionsList = prunedPartitions.toList
+ targetColumns.flatMap(colName =>
+ partitionsList.map(partitionPath => new
ColumnStatsIndexPrefixRawKey(colName, partitionPath))
+ )
} else {
- encodedTargetColumnNames
+ targetColumns.map(colName => new ColumnStatsIndexPrefixRawKey(colName))
}
- loadExpressionIndexForColumnsInternal(indexPartition, shouldReadInMemory,
keyPrefixes)
+
+ loadExpressionIndexForColumnsInternal(indexPartition, shouldReadInMemory,
rawKeys)
}
- private def loadExpressionIndexForColumnsInternal(indexPartition: String,
shouldReadInMemory: Boolean, keyPrefixes: Iterable[String] with (String with
Int => Any)) = {
+ private def loadExpressionIndexForColumnsInternal(indexPartition: String,
shouldReadInMemory: Boolean, keyPrefixes:
Iterable[ColumnStatsIndexPrefixRawKey]) = {
val metadataRecords: HoodieData[HoodieRecord[HoodieMetadataPayload]] =
metadataTable.getRecordsByKeyPrefixes(
- HoodieListData.eager(keyPrefixes.toSeq.asJava), indexPartition,
shouldReadInMemory,
- HoodieTableMetadataUtil.IDENTITY_ENCODING)
+ HoodieListData.eager(keyPrefixes.toSeq.asJava), indexPartition,
shouldReadInMemory)
val columnStatsRecords: HoodieData[HoodieMetadataColumnStats] =
//TODO: [HUDI-8303] Explicit conversion might not be required for Scala
2.12+
metadataRecords.map(JFunction.toJavaSerializableFunction(record => {
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionStatsIndexSupport.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionStatsIndexSupport.scala
index 5b5fcfeb12d4..0208628d4e73 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionStatsIndexSupport.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionStatsIndexSupport.scala
@@ -29,7 +29,7 @@ import org.apache.hudi.common.model.{FileSlice, HoodieRecord}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.common.util.hash.ColumnIndexID
-import org.apache.hudi.metadata.{HoodieMetadataPayload,
HoodieTableMetadataUtil}
+import org.apache.hudi.metadata.{ColumnStatsIndexPrefixRawKey,
HoodieMetadataPayload, HoodieTableMetadataUtil}
import
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS
import org.apache.hudi.util.JFunction
@@ -68,12 +68,12 @@ class PartitionStatsIndexSupport(spark: SparkSession,
override def loadColumnStatsIndexRecords(targetColumns: Seq[String],
prunedPartitions: Option[Set[String]] = None, shouldReadInMemory: Boolean):
HoodieData[HoodieMetadataColumnStats] = {
checkState(targetColumns.nonEmpty)
- val encodedTargetColumnNames = targetColumns.map(colName => new
ColumnIndexID(colName).asBase64EncodedString())
- logDebug(s"Loading column stats for columns: ${targetColumns.mkString(",
")}, Encoded column names: ${encodedTargetColumnNames.mkString(", ")}")
+ logDebug(s"Loading column stats for columns: ${targetColumns.mkString(",
")}")
+ // For partition stats, we only need column names (no partition name)
+ val rawKeys = targetColumns.map(colName => new
ColumnStatsIndexPrefixRawKey(colName))
val metadataRecords: HoodieData[HoodieRecord[HoodieMetadataPayload]] =
metadataTable.getRecordsByKeyPrefixes(
- HoodieListData.eager(encodedTargetColumnNames.asJava),
HoodieTableMetadataUtil.PARTITION_NAME_PARTITION_STATS, shouldReadInMemory,
- HoodieTableMetadataUtil.IDENTITY_ENCODING)
+ HoodieListData.eager(rawKeys.asJava),
HoodieTableMetadataUtil.PARTITION_NAME_PARTITION_STATS, shouldReadInMemory)
val columnStatsRecords: HoodieData[HoodieMetadataColumnStats] =
//TODO: [HUDI-8303] Explicit conversion might not be required for Scala
2.12+
metadataRecords.map(JFunction.toJavaSerializableFunction(record => {
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala
index 9a5b2eefdbc2..c7fa20cf302d 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala
@@ -75,7 +75,7 @@ class RecordLevelIndexSupport(spark: SparkSession,
* @return Sequence of file names which need to be queried
*/
private def getCandidateFilesForRecordKeys(allFiles: Seq[StoragePath],
recordKeys: List[String]): Set[String] = {
- val recordIndexData = metadataTable.readRecordIndex(
+ val recordIndexData = metadataTable.readRecordIndexLocationsWithKeys(
HoodieListData.eager(JavaConverters.seqAsJavaListConverter(recordKeys).asJava))
try {
val recordKeyLocationsMap =
HoodieDataUtils.dedupeAndCollectAsMap(recordIndexData)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SecondaryIndexSupport.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SecondaryIndexSupport.scala
index 1bc7b4f91b75..0a56f7f247b5 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SecondaryIndexSupport.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SecondaryIndexSupport.scala
@@ -79,7 +79,7 @@ class SecondaryIndexSupport(spark: SparkSession,
* @return Sequence of file names which need to be queried
*/
private def getCandidateFilesFromSecondaryIndex(allFiles: Seq[StoragePath],
secondaryKeys: List[String], secondaryIndexName: String): Set[String] = {
- val secondaryIndexData = metadataTable.readSecondaryIndex(
+ val secondaryIndexData = metadataTable.readSecondaryIndexLocationsWithKeys(
HoodieListData.eager(JavaConverters.seqAsJavaListConverter(secondaryKeys).asJava),
secondaryIndexName)
try {
val recordKeyLocationsMap =
HoodieDataUtils.dedupeAndCollectAsMap(secondaryIndexData)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
index 85609d2bf253..c659697be80e 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
@@ -89,8 +89,6 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.common.util.collection.Triple;
-import org.apache.hudi.common.util.hash.ColumnIndexID;
-import org.apache.hudi.common.util.hash.PartitionIndexID;
import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieClusteringConfig;
@@ -104,9 +102,11 @@ import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase;
import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.hudi.metadata.FileSystemBackedTableMetadata;
+import org.apache.hudi.metadata.ColumnStatsIndexPrefixRawKey;
import org.apache.hudi.metadata.HoodieBackedTableMetadata;
import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
import org.apache.hudi.metadata.HoodieMetadataMetrics;
+import org.apache.hudi.metadata.RawKey;
import org.apache.hudi.metadata.HoodieMetadataPayload;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
@@ -194,7 +194,6 @@ import static
org.apache.hudi.config.HoodieCompactionConfig.INLINE_COMPACT_NUM_D
import static
org.apache.hudi.io.storage.HoodieSparkIOFactory.getHoodieSparkIOFactory;
import static
org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP;
import static
org.apache.hudi.metadata.HoodieTableMetadata.getMetadataTableBasePath;
-import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.IDENTITY_ENCODING;
import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.deleteMetadataTable;
import static org.apache.hudi.metadata.MetadataPartitionType.BLOOM_FILTERS;
import static org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS;
@@ -1192,7 +1191,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
HoodieTableMetadata metadataReader =
metaClient.getTableFormat().getMetadataFactory().create(
context, storage, writeConfig.getMetadataConfig(),
writeConfig.getBasePath());
HoodiePairData<String, HoodieRecordGlobalLocation> recordIndexData1 =
metadataReader
-
.readRecordIndex(HoodieListData.eager(records1.stream().map(HoodieRecord::getRecordKey)
+
.readRecordIndexLocationsWithKeys(HoodieListData.eager(records1.stream().map(HoodieRecord::getRecordKey)
.collect(Collectors.toList())));
Map<String, HoodieRecordGlobalLocation> result;
try {
@@ -1203,7 +1202,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
}
HoodiePairData<String, HoodieRecordGlobalLocation> recordIndexData2 =
metadataReader
-
.readRecordIndex(HoodieListData.eager(records2.stream().map(HoodieRecord::getRecordKey).collect(Collectors.toList())));
+
.readRecordIndexLocationsWithKeys(HoodieListData.eager(records2.stream().map(HoodieRecord::getRecordKey).collect(Collectors.toList())));
try {
result = HoodieDataUtils.dedupeAndCollectAsMap(recordIndexData2);
assertEquals(records2.size(), result.size(), "RI should return entries
in the commit.");
@@ -2012,19 +2011,20 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
HoodieTableMetadata tableMetadata = metadata(client, storage);
// prefix search for column (_hoodie_record_key)
- ColumnIndexID columnIndexID = new
ColumnIndexID(HoodieRecord.RECORD_KEY_METADATA_FIELD);
+ ColumnStatsIndexPrefixRawKey columnKey = new
ColumnStatsIndexPrefixRawKey(HoodieRecord.RECORD_KEY_METADATA_FIELD);
List<HoodieRecord<HoodieMetadataPayload>> result =
tableMetadata.getRecordsByKeyPrefixes(
-
HoodieListData.lazy(Collections.singletonList(columnIndexID.asBase64EncodedString())),
- MetadataPartitionType.COLUMN_STATS.getPartitionPath(), true,
IDENTITY_ENCODING).collectAsList();
+ HoodieListData.lazy(Collections.singletonList(columnKey)),
+ MetadataPartitionType.COLUMN_STATS.getPartitionPath(),
true).collectAsList();
// there are 3 partitions in total and 2 commits. total entries should
be 6.
assertEquals(result.size(), 6);
// prefix search for col(_hoodie_record_key) and first partition. only 2
files should be matched
- PartitionIndexID partitionIndexID = new
PartitionIndexID(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);
+ ColumnStatsIndexPrefixRawKey columnWithPartitionKey = new
ColumnStatsIndexPrefixRawKey(HoodieRecord.RECORD_KEY_METADATA_FIELD,
+ HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);
result = tableMetadata.getRecordsByKeyPrefixes(
-
HoodieListData.lazy(Collections.singletonList(columnIndexID.asBase64EncodedString().concat(partitionIndexID.asBase64EncodedString()))),
- MetadataPartitionType.COLUMN_STATS.getPartitionPath(), true,
IDENTITY_ENCODING).collectAsList();
+
HoodieListData.lazy(Collections.singletonList(columnWithPartitionKey)),
+ MetadataPartitionType.COLUMN_STATS.getPartitionPath(),
true).collectAsList();
// 1 partition and 2 commits. total entries should be 2.
assertEquals(result.size(), 2);
result.forEach(entry -> {
@@ -2040,10 +2040,11 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
});
// prefix search for column {commit time} and first partition
- columnIndexID = new
ColumnIndexID(HoodieRecord.COMMIT_TIME_METADATA_FIELD);
+ ColumnStatsIndexPrefixRawKey commitTimeKey = new
ColumnStatsIndexPrefixRawKey(HoodieRecord.COMMIT_TIME_METADATA_FIELD,
+ HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);
result = tableMetadata.getRecordsByKeyPrefixes(
-
HoodieListData.lazy(Collections.singletonList(columnIndexID.asBase64EncodedString().concat(partitionIndexID.asBase64EncodedString()))),
- MetadataPartitionType.COLUMN_STATS.getPartitionPath(), true,
IDENTITY_ENCODING).collectAsList();
+ HoodieListData.lazy(Collections.singletonList(commitTimeKey)),
+ MetadataPartitionType.COLUMN_STATS.getPartitionPath(),
true).collectAsList();
// 1 partition and 2 commits. total entries should be 2.
assertEquals(result.size(), 2);
@@ -3030,9 +3031,16 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
// assert entry is not present for deleted partition in metadata table
HoodieTableMetadata tableMetadata = metadata(client, storage);
+ // Create a simple RawKey implementation for the partition path
+ RawKey partitionKey = new RawKey() {
+ @Override
+ public String encode() {
+ return HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
+ }
+ };
assertTrue(tableMetadata.getRecordsByKeyPrefixes(
-
HoodieListData.lazy(Collections.singletonList(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)),
- FILES.getPartitionPath(), false, IDENTITY_ENCODING).isEmpty());
+ HoodieListData.lazy(Collections.singletonList(partitionKey)),
+ FILES.getPartitionPath(), false).isEmpty());
assertTrue(tableMetadata.getAllPartitionPaths().contains(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH));
assertFalse(tableMetadata.getAllPartitionPaths().contains(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH));
// above upsert would have triggered clean
@@ -3623,7 +3631,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
HoodieTableMetadata metadataReader =
metaClient.getTableFormat().getMetadataFactory().create(
context, storage, writeConfig.getMetadataConfig(),
writeConfig.getBasePath());
Map<String, HoodieRecordGlobalLocation> result =
HoodieDataUtils.dedupeAndCollectAsMap(
- metadataReader.readRecordIndex(
+ metadataReader.readRecordIndexLocationsWithKeys(
HoodieListData.eager(allRecords.stream().map(HoodieRecord::getRecordKey).collect(Collectors.toList()))));
assertEquals(allRecords.size(), result.size(), "RI should have mapping
for all the records in firstCommit");
@@ -3639,7 +3647,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
// RI should not return mappings for deleted records
metadataReader = metaClient.getTableFormat().getMetadataFactory().create(
context, storage, writeConfig.getMetadataConfig(),
writeConfig.getBasePath());
- result =
HoodieDataUtils.dedupeAndCollectAsMap(metadataReader.readRecordIndex(
+ result =
HoodieDataUtils.dedupeAndCollectAsMap(metadataReader.readRecordIndexLocationsWithKeys(
HoodieListData.eager(allRecords.stream().map(HoodieRecord::getRecordKey).collect(Collectors.toList()))));
assertEquals(allRecords.size() - recordsToDelete.size(), result.size(),
"RI should not have mapping for deleted records");
result.keySet().forEach(mappingKey ->
assertFalse(keysToDelete.contains(mappingKey), "RI should not have mapping for
deleted records"));
@@ -3660,7 +3668,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
// RI should not return mappings for deleted records
metadataReader =
metaClient.getTableFormat().getMetadataFactory().create(context, storage,
writeConfig.getMetadataConfig(), writeConfig.getBasePath());
Map<String, HoodieRecordGlobalLocation> result =
- HoodieDataUtils.dedupeAndCollectAsMap(metadataReader.readRecordIndex(
+
HoodieDataUtils.dedupeAndCollectAsMap(metadataReader.readRecordIndexLocationsWithKeys(
HoodieListData.eager(allRecords.stream().map(HoodieRecord::getRecordKey).collect(Collectors.toList()))));
assertEquals(allRecords.size() - keysToDelete.size(), result.size(), "RI
should not have mapping for deleted records");
@@ -3671,7 +3679,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
// New mappings should have been created for re-inserted records and
should map to the new commit time
metadataReader =
metaClient.getTableFormat().getMetadataFactory().create(context, storage,
writeConfig.getMetadataConfig(), writeConfig.getBasePath());
- result =
HoodieDataUtils.dedupeAndCollectAsMap(metadataReader.readRecordIndex(
+ result =
HoodieDataUtils.dedupeAndCollectAsMap(metadataReader.readRecordIndexLocationsWithKeys(
HoodieListData.eager(allRecords.stream().map(HoodieRecord::getRecordKey).collect(Collectors.toList()))));
assertEquals(allRecords.size(), result.size(), "RI should have mappings
for re-inserted records");
for (String reInsertedKey : keysToDelete) {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
index 43729d13dcd8..3fd94ad5cb3a 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
@@ -127,7 +127,7 @@ class RecordLevelIndexTestBase extends
HoodieStatsIndexTestBase {
val readDf = spark.read.format("hudi").load(basePath)
readDf.cache()
val rowArr = readDf.collect()
- val recordIndexMap =
HoodieDataUtils.dedupeAndCollectAsMap(metadata.readRecordIndex(
+ val recordIndexMap =
HoodieDataUtils.dedupeAndCollectAsMap(metadata.readRecordIndexLocationsWithKeys(
HoodieListData.eager(JavaConverters.seqAsJavaListConverter(rowArr.map(row =>
row.getAs("_hoodie_record_key").toString).toList).asJava)))
assertTrue(rowArr.length > 0)
@@ -141,7 +141,7 @@ class RecordLevelIndexTestBase extends
HoodieStatsIndexTestBase {
}
val deletedRows = deletedDf.collect()
- val recordIndexMapForDeletedRows =
HoodieDataUtils.dedupeAndCollectAsMap(metadata.readRecordIndex(
+ val recordIndexMapForDeletedRows =
HoodieDataUtils.dedupeAndCollectAsMap(metadata.readRecordIndexLocationsWithKeys(
HoodieListData.eager(JavaConverters.seqAsJavaListConverter(deletedRows.map(row
=> row.getAs("_row_key").toString).toList).asJava)))
assertEquals(0, recordIndexMapForDeletedRows.size(), "deleted records
should not present in RLI")
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataRecordIndex.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataRecordIndex.scala
index 4aeee163aba4..e1ece6d88ecb 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataRecordIndex.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataRecordIndex.scala
@@ -186,7 +186,7 @@ class TestMetadataRecordIndex extends
HoodieSparkClientTestBase {
val metadata = metadataWriter(writeConfig).getTableMetadata
val readDf = spark.read.format("hudi").load(basePath)
val rowArr = readDf.collect()
- val res = metadata.readRecordIndex(
+ val res = metadata.readRecordIndexLocationsWithKeys(
HoodieListData.eager(rowArr.map(row =>
row.getAs("_hoodie_record_key").toString).toList.asJava));
val recordIndexMap = HoodieDataUtils.dedupeAndCollectAsMap(res);
res.unpersistWithDependencies()
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestSecondaryIndex.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestSecondaryIndex.scala
index 8cc34c7aa756..aedd18e0ead3 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestSecondaryIndex.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestSecondaryIndex.scala
@@ -792,7 +792,8 @@ class TestSecondaryIndex extends HoodieSparkSqlTestBase {
private def testSecondaryIndexWithNullableColumns(tableType: String): Unit =
{
withSparkSqlSessionConfig(
HoodieWriteConfig.WRITE_TABLE_VERSION.key() -> "9",
- "hoodie.embed.timeline.server" -> "false") {
+ "hoodie.embed.timeline.server" -> "false",
+ "hoodie.parquet.small.file.limit" -> "0") {
withTempDir { tmp =>
val tableName = generateTableName + s"_nullable_${tableType}"
val basePath = s"${tmp.getCanonicalPath}/$tableName"
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/testHoodieBackedTableMetadataIndexLookup.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/testHoodieBackedTableMetadataIndexLookup.scala
index ac6641d81644..88a2c13436ed 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/testHoodieBackedTableMetadataIndexLookup.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/testHoodieBackedTableMetadataIndexLookup.scala
@@ -64,7 +64,7 @@ abstract class HoodieBackedTableMetadataIndexLookupTestBase
extends HoodieSparkS
|) using hudi
| options (
| primaryKey ='id',
- | type = 'cow',
+ | type = 'mor',
| preCombineField = 'ts',
| hoodie.metadata.enable = 'true',
| hoodie.metadata.record.index.enable = 'true',
@@ -123,6 +123,7 @@ abstract class HoodieBackedTableMetadataIndexLookupTestBase
extends HoodieSparkS
// Create shared temporary directory
tmpDir = Utils.createTempDir()
+ spark.sql("set hoodie.parquet.small.file.limit=0")
// Setup shared test data
setupSharedTestData()
}
@@ -170,6 +171,9 @@ abstract class HoodieBackedTableMetadataIndexLookupTestBase
extends HoodieSparkS
spark.sql(s"insert into $tableName" + " values('$a', 'sec$key', 40, 1001)")
spark.sql(s"insert into $tableName" + " values('a$a', '$sec$', 50, 1002)")
spark.sql(s"insert into $tableName" + " values('$$', '$$', 60, 1003)")
+ // generate some deleted records
+ spark.sql(s"insert into $tableName" + " values('$$3', '$$', 60, 1003)")
+ spark.sql(s"delete from $tableName" + " where id = '$$3'")
val props = Map(
"hoodie.insert.shuffle.parallelism" -> "4",
@@ -259,14 +263,14 @@ abstract class
HoodieBackedTableMetadataIndexLookupTestBase extends HoodieSparkS
cleanUpCachedRDDs()
// Case 1: Empty input
- val emptyResultRDD =
hoodieBackedTableMetadata.readRecordIndex(HoodieListData.eager(List.empty[String].asJava))
+ val emptyResultRDD =
hoodieBackedTableMetadata.readRecordIndexLocationsWithKeys(HoodieListData.eager(List.empty[String].asJava))
val emptyResult = emptyResultRDD.collectAsList()
assert(emptyResult.isEmpty, "Empty input should return empty result")
emptyResultRDD.unpersistWithDependencies()
// Case 2: All existing keys including those with $ characters
val allKeys = HoodieListData.eager(List("a1", "a2", "a$", "$a", "a$a",
"$$").asJava)
- val allResultRDD = hoodieBackedTableMetadata.readRecordIndex(allKeys)
+ val allResultRDD =
hoodieBackedTableMetadata.readRecordIndexLocationsWithKeys(allKeys)
val allResult = allResultRDD.collectAsList().asScala
allResultRDD.unpersistWithDependencies()
// Validate keys including special characters
@@ -293,14 +297,14 @@ abstract class
HoodieBackedTableMetadataIndexLookupTestBase extends HoodieSparkS
// Case 3: Non-existing keys, some matches the prefix of the existing
records.
val nonExistKeys = HoodieListData.eager(List("", "a", "a100", "200", "$",
"a$$", "$$a", "$a$").asJava)
- val nonExistResultRDD =
hoodieBackedTableMetadata.readRecordIndex(nonExistKeys)
+ val nonExistResultRDD =
hoodieBackedTableMetadata.readRecordIndexLocationsWithKeys(nonExistKeys)
val nonExistResult = nonExistResultRDD.collectAsList().asScala
assert(nonExistResult.isEmpty, "Non-existing keys should return empty
result")
nonExistResultRDD.unpersistWithDependencies()
// Case 4: Mix of existing and non-existing keys
val mixedKeys = HoodieListData.eager(List("a1", "a100", "a2",
"a200").asJava)
- val mixedResultRDD = hoodieBackedTableMetadata.readRecordIndex(mixedKeys)
+ val mixedResultRDD =
hoodieBackedTableMetadata.readRecordIndexLocationsWithKeys(mixedKeys)
val mixedResult = mixedResultRDD.collectAsList().asScala
val mixedResultKeys = mixedResult.map(_.getKey()).toSet
assert(mixedResultKeys == Set("a1", "a2"), "Should only return existing
keys")
@@ -308,7 +312,7 @@ abstract class HoodieBackedTableMetadataIndexLookupTestBase
extends HoodieSparkS
// Case 5: Duplicate keys including those with $ characters
val dupKeys = HoodieListData.eager(List("a1", "a1", "a2", "a2", "a$",
"a$", "$a", "a$a", "a$a", "$a", "$$", "$$").asJava)
- val dupResultRDD = hoodieBackedTableMetadata.readRecordIndex(dupKeys)
+ val dupResultRDD =
hoodieBackedTableMetadata.readRecordIndexLocationsWithKeys(dupKeys)
val dupResult = dupResultRDD.collectAsList().asScala
val dupResultKeys = dupResult.map(_.getKey()).toSet
assert(dupResultKeys == Set("a1", "a2", "a$", "$a", "a$a", "$$"), "Should
deduplicate keys including those with $")
@@ -318,7 +322,7 @@ abstract class HoodieBackedTableMetadataIndexLookupTestBase
extends HoodieSparkS
jsc = new JavaSparkContext(spark.sparkContext)
context = new HoodieSparkEngineContext(jsc, new SQLContext(spark))
val rddKeys = HoodieJavaRDD.of(List("a1", "a2", "a$").asJava, context, 2)
- val rddResult = hoodieBackedTableMetadata.readRecordIndex(rddKeys)
+ val rddResult =
hoodieBackedTableMetadata.readRecordIndexLocationsWithKeys(rddKeys)
val rddResultKeys = rddResult.map(_.getKey()).collectAsList().asScala.toSet
assert(rddResultKeys == Set("a1", "a2", "a$"), "Should deduplicate keys
including those with $")
rddResult.unpersistWithDependencies()
@@ -427,7 +431,7 @@ abstract class HoodieBackedTableMetadataIndexLookupTestBase
extends HoodieSparkS
// Test with existing secondary keys including those with $ characters
val existingKeys = HoodieListData.eager(List("b1", "b2", "b$", "b$", "b1",
"$$", "sec$key", "$sec$", "$$", null).asJava)
- val result =
hoodieBackedTableMetadata.getSecondaryIndexRecords(existingKeys,
secondaryIndexName)
+ val result =
hoodieBackedTableMetadata.readSecondaryIndexDataTableRecordKeysWithKeys(existingKeys,
secondaryIndexName)
val resultMap = HoodieDataUtils.collectPairDataAsMap(result)
assert(resultMap.size == 6, s"Should return 6 results for existing
secondary keys in table version ${getTableVersion}")
@@ -441,14 +445,14 @@ abstract class
HoodieBackedTableMetadataIndexLookupTestBase extends HoodieSparkS
// Test with non-existing secondary keys
val nonExistingKeys = HoodieListData.eager(List("", "b", "$", " ", null,
"non_exist_1", "non_exist_2").asJava)
- val nonExistingResult =
hoodieBackedTableMetadata.getSecondaryIndexRecords(nonExistingKeys,
secondaryIndexName)
+ val nonExistingResult =
hoodieBackedTableMetadata.readSecondaryIndexDataTableRecordKeysWithKeys(nonExistingKeys,
secondaryIndexName)
val nonExistingMap =
HoodieDataUtils.collectPairDataAsMap(nonExistingResult)
assert(nonExistingMap.isEmpty, s"Should return empty result for
non-existing secondary keys in table version ${getTableVersion}")
nonExistingResult.unpersistWithDependencies()
// Test with a mixture of existing and non-existing secondary keys
val rddKeys = HoodieJavaRDD.of(List("b1", "b2", "b$", null, "$").asJava,
context, 2)
- val rddResult =
hoodieBackedTableMetadata.getSecondaryIndexRecords(rddKeys, secondaryIndexName)
+ val rddResult =
hoodieBackedTableMetadata.readSecondaryIndexDataTableRecordKeysWithKeys(rddKeys,
secondaryIndexName)
// Collect and validate results
val rddResultMap = HoodieDataUtils.collectPairDataAsMap(rddResult)
@@ -610,7 +614,7 @@ class HoodieBackedTableMetadataIndexLookupV9TestBase
extends HoodieBackedTableMe
}
// Test getSecondaryIndexRecords API with null value
- val nullRecordsResult =
hoodieBackedTableMetadata.getSecondaryIndexRecords(nullKeys, secondaryIndexName)
+ val nullRecordsResult =
hoodieBackedTableMetadata.readSecondaryIndexDataTableRecordKeysWithKeys(nullKeys,
secondaryIndexName)
val nullRecordsMap =
HoodieDataUtils.collectPairDataAsMap(nullRecordsResult)
nullRecordsResult.unpersistWithDependencies()
// Verify that null key maps to record keys.
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
index b5b0fb072459..49327fa731d5 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
@@ -1139,7 +1139,7 @@ public class HoodieMetadataTableValidator implements
Serializable {
try {
for (int i = 0; i < numPartitions; i++) {
List<String> secKeys = secondaryKeys.collectPartitions(new int[]
{i})[0];
- HoodiePairData<String, String> secondaryIndexData =
((HoodieBackedTableMetadata)
metadataContext.tableMetadata).getSecondaryIndexRecords(
+ HoodiePairData<String, String> secondaryIndexData =
((HoodieBackedTableMetadata)
metadataContext.tableMetadata).readSecondaryIndexDataTableRecordKeysWithKeys(
HoodieListData.lazy(secKeys), indexDefinition.getIndexName());
try {
Map<String, Set<String>> mdtSecondaryKeyToRecordKeys =
HoodieDataUtils.collectPairDataAsMap(secondaryIndexData);