This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new ff60d3213651 [HUDI-9548] Secondary index lookup should do escape ->
hash -> sort for read write (#13521)
ff60d3213651 is described below
commit ff60d32136516bb8f652471db9d2373b00ded458
Author: Davis-Zhang-Onehouse
<[email protected]>
AuthorDate: Wed Jul 23 18:26:12 2025 -0700
[HUDI-9548] Secondary index lookup should do escape -> hash -> sort for
read write (#13521)
---
.../hudi/client/TestJavaHoodieBackedMetadata.java | 7 +-
.../client/utils/SparkMetadataWriterUtils.java | 3 +-
.../hudi/common/table/view/NoOpTableMetadata.java | 2 +-
.../apache/hudi/metadata/BaseTableMetadata.java | 10 ++-
.../metadata/FileSystemBackedTableMetadata.java | 2 +-
.../hudi/metadata/HoodieBackedTableMetadata.java | 94 ++++++++++++++--------
.../apache/hudi/metadata/HoodieTableMetadata.java | 2 +-
.../hudi/metadata/HoodieTableMetadataUtil.java | 8 +-
.../hudi/metadata/SecondaryIndexKeyUtils.java | 8 +-
.../hudi/metadata/TestHoodieTableMetadataUtil.java | 24 ++++--
.../apache/hudi/source/stats/FileStatsIndex.java | 4 +-
.../org/apache/hudi/ColumnStatsIndexSupport.scala | 2 +-
.../org/apache/hudi/ExpressionIndexSupport.scala | 4 +-
.../apache/hudi/PartitionStatsIndexSupport.scala | 2 +-
.../hudi/functional/TestHoodieBackedMetadata.java | 9 ++-
15 files changed, 115 insertions(+), 66 deletions(-)
diff --git
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
index cd55d027d6f5..0c9e4df761da 100644
---
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
+++
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
@@ -164,6 +164,7 @@ import static
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_FILE_NAME
import static
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
import static
org.apache.hudi.config.HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS;
import static
org.apache.hudi.metadata.HoodieTableMetadata.getMetadataTableBasePath;
+import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.IDENTITY_ENCODING;
import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.deleteMetadataTable;
import static org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS;
import static org.apache.hudi.metadata.MetadataPartitionType.FILES;
@@ -1473,7 +1474,7 @@ public class TestJavaHoodieBackedMetadata extends
TestHoodieMetadataBase {
ColumnIndexID columnIndexID = new
ColumnIndexID(HoodieRecord.RECORD_KEY_METADATA_FIELD);
List<HoodieRecord<HoodieMetadataPayload>> result =
tableMetadata.getRecordsByKeyPrefixes(
HoodieListData.lazy(Collections.singletonList(columnIndexID.asBase64EncodedString())),
- MetadataPartitionType.COLUMN_STATS.getPartitionPath(), true,
Option.empty()).collectAsList();
+ MetadataPartitionType.COLUMN_STATS.getPartitionPath(), true,
IDENTITY_ENCODING).collectAsList();
// there are 3 partitions in total and 2 commits. total entries should
be 6.
assertEquals(result.size(), 6);
@@ -1485,7 +1486,7 @@ public class TestJavaHoodieBackedMetadata extends
TestHoodieMetadataBase {
PartitionIndexID partitionIndexID = new
PartitionIndexID(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);
result = tableMetadata.getRecordsByKeyPrefixes(
HoodieListData.lazy(Collections.singletonList(columnIndexID.asBase64EncodedString().concat(partitionIndexID.asBase64EncodedString()))),
- MetadataPartitionType.COLUMN_STATS.getPartitionPath(), true,
Option.empty()).collectAsList();
+ MetadataPartitionType.COLUMN_STATS.getPartitionPath(), true,
IDENTITY_ENCODING).collectAsList();
// 1 partition and 2 commits. total entries should be 2.
assertEquals(result.size(), 2);
result.forEach(entry -> {
@@ -1505,7 +1506,7 @@ public class TestJavaHoodieBackedMetadata extends
TestHoodieMetadataBase {
columnIndexID = new
ColumnIndexID(HoodieRecord.COMMIT_TIME_METADATA_FIELD);
result = tableMetadata.getRecordsByKeyPrefixes(
HoodieListData.lazy(Collections.singletonList(columnIndexID.asBase64EncodedString().concat(partitionIndexID.asBase64EncodedString()))),
- MetadataPartitionType.COLUMN_STATS.getPartitionPath(), true,
Option.empty()).collectAsList();
+ MetadataPartitionType.COLUMN_STATS.getPartitionPath(), true,
IDENTITY_ENCODING).collectAsList();
// 1 partition and 2 commits. total entries should be 2.
assertEquals(result.size(), 2);
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
index 3920ea2c50ae..cddb24c23981 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
@@ -108,6 +108,7 @@ import static
org.apache.hudi.metadata.HoodieMetadataPayload.COLUMN_STATS_FIELD_
import static
org.apache.hudi.metadata.HoodieMetadataPayload.COLUMN_STATS_FIELD_VALUE_COUNT;
import static
org.apache.hudi.metadata.HoodieMetadataPayload.createBloomFilterMetadataRecord;
import static
org.apache.hudi.metadata.HoodieMetadataPayload.createColumnStatsRecords;
+import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.IDENTITY_ENCODING;
import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_BLOOM_FILTERS;
import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS;
import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.getFileSystemViewForMetadataTable;
@@ -401,7 +402,7 @@ public class SparkMetadataWriterUtils {
// Fetch EI column stat records for above files
List<HoodieColumnRangeMetadata<Comparable>> partitionColumnMetadata =
tableMetadata.getRecordsByKeyPrefixes(
-
HoodieListData.lazy(HoodieTableMetadataUtil.generateKeyPrefixes(validColumnsToIndex,
partitionName)), indexPartition, false, Option.empty())
+
HoodieListData.lazy(HoodieTableMetadataUtil.generateKeyPrefixes(validColumnsToIndex,
partitionName)), indexPartition, false, IDENTITY_ENCODING)
// schema and properties are ignored in getInsertValue, so
simply pass as null
.map(record -> record.getData().getInsertValue(null, null))
.filter(Option::isPresent)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/NoOpTableMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/NoOpTableMetadata.java
index 6f0b9d0695c3..6c7cc33f934f 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/NoOpTableMetadata.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/NoOpTableMetadata.java
@@ -118,7 +118,7 @@ class NoOpTableMetadata implements HoodieTableMetadata {
public HoodieData<HoodieRecord<HoodieMetadataPayload>>
getRecordsByKeyPrefixes(HoodieData<String> keyPrefixes,
String partitionName,
boolean shouldLoadInMemory,
-
Option<SerializableFunctionUnchecked<String, String>> keyEncoder) {
+
SerializableFunctionUnchecked<String, String> keyEncoder) {
throw new HoodieMetadataException("Unsupported operation:
getRecordsByKeyPrefixes!");
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
index 0b071b5bfd5c..1519e38de244 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
@@ -66,6 +66,8 @@ import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
+import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.IDENTITY_ENCODING;
+
/**
* Abstract class for implementing common table metadata operations.
*/
@@ -210,7 +212,7 @@ public abstract class BaseTableMetadata extends
AbstractHoodieTableMetadata {
List<String> partitionIDFileIDStringsList = new
ArrayList<>(partitionIDFileIDStrings);
Map<String, HoodieRecord<HoodieMetadataPayload>> hoodieRecords =
HoodieDataUtils.dedupeAndCollectAsMap(
-
getRecordsByKeys(HoodieListData.eager(partitionIDFileIDStringsList),
metadataPartitionName, Option.empty()));
+
getRecordsByKeys(HoodieListData.eager(partitionIDFileIDStringsList),
metadataPartitionName, IDENTITY_ENCODING));
metrics.ifPresent(m ->
m.updateMetrics(HoodieMetadataMetrics.LOOKUP_BLOOM_FILTERS_METADATA_STR,
timer.endTimer()));
metrics.ifPresent(m ->
m.setMetric(HoodieMetadataMetrics.LOOKUP_BLOOM_FILTERS_FILE_COUNT_STR,
partitionIDFileIDStringsList.size()));
@@ -330,7 +332,7 @@ public abstract class BaseTableMetadata extends
AbstractHoodieTableMetadata {
Map<String, HoodieRecord<HoodieMetadataPayload>> partitionIdRecordPairs =
HoodieDataUtils.dedupeAndCollectAsMap(
getRecordsByKeys(HoodieListData.eager(new
ArrayList<>(partitionIdToPathMap.keySet())),
- MetadataPartitionType.FILES.getPartitionPath(),
Option.empty()));
+ MetadataPartitionType.FILES.getPartitionPath(),
IDENTITY_ENCODING));
metrics.ifPresent(
m -> m.updateMetrics(HoodieMetadataMetrics.LOOKUP_FILES_STR,
timer.endTimer()));
@@ -387,7 +389,7 @@ public abstract class BaseTableMetadata extends
AbstractHoodieTableMetadata {
Map<String, HoodieRecord<HoodieMetadataPayload>> hoodieRecords =
HoodieDataUtils.dedupeAndCollectAsMap(
getRecordsByKeys(
- HoodieListData.eager(columnStatKeylist),
MetadataPartitionType.COLUMN_STATS.getPartitionPath(), Option.empty()));
+ HoodieListData.eager(columnStatKeylist),
MetadataPartitionType.COLUMN_STATS.getPartitionPath(), IDENTITY_ENCODING));
metrics.ifPresent(m ->
m.updateMetrics(HoodieMetadataMetrics.LOOKUP_COLUMN_STATS_METADATA_STR,
timer.endTimer()));
Map<Pair<String, String>, List<HoodieMetadataColumnStats>>
fileToColumnStatMap = new HashMap<>();
for (final Map.Entry<String, HoodieRecord<HoodieMetadataPayload>> entry :
hoodieRecords.entrySet()) {
@@ -436,7 +438,7 @@ public abstract class BaseTableMetadata extends
AbstractHoodieTableMetadata {
* @return A collection of pairs (key -> record)
*/
public abstract HoodiePairData<String, HoodieRecord<HoodieMetadataPayload>>
getRecordsByKeys(
- HoodieData<String> keys, String partitionName,
Option<SerializableFunctionUnchecked<String, String>> keyEncodingFn);
+ HoodieData<String> keys, String partitionName,
SerializableFunctionUnchecked<String, String> keyEncodingFn);
/**
* Returns a collection of pairs (secondary-key -> set-of-record-keys) for
the provided secondary keys.
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
index 06097655e7ca..6b01180696af 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
@@ -314,7 +314,7 @@ public class FileSystemBackedTableMetadata extends
AbstractHoodieTableMetadata {
public HoodieData<HoodieRecord<HoodieMetadataPayload>>
getRecordsByKeyPrefixes(HoodieData<String> keyPrefixes,
String partitionName,
boolean shouldLoadInMemory,
-
Option<SerializableFunctionUnchecked<String, String>> keyEncoder) {
+
SerializableFunctionUnchecked<String, String> keyEncoder) {
throw new HoodieMetadataException("Unsupported operation:
getRecordsByKeyPrefixes!");
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index 79964298cf1b..8d199f3ecff5 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -95,12 +95,12 @@ import static
org.apache.hudi.common.config.HoodieMemoryConfig.SPILLABLE_MAP_BAS
import static
org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FULL_SCAN_LOG_FILES;
import static org.apache.hudi.common.util.ValidationUtils.checkState;
import static org.apache.hudi.metadata.HoodieMetadataPayload.KEY_FIELD_NAME;
+import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.IDENTITY_ENCODING;
import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_BLOOM_FILTERS;
import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS;
import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_FILES;
import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.existingIndexVersionOrDefault;
import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.getFileSystemViewForMetadataTable;
-import static
org.apache.hudi.metadata.SecondaryIndexKeyUtils.unescapeSpecialChars;
/**
* Table metadata provided by an internal DFS backed Hudi metadata table.
@@ -168,7 +168,7 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
@Override
protected Option<HoodieRecord<HoodieMetadataPayload>> getRecordByKey(String
key, String partitionName) {
List<HoodieRecord<HoodieMetadataPayload>> records = getRecordsByKeys(
- HoodieListData.eager(Collections.singletonList(key)), partitionName,
Option.empty())
+ HoodieListData.eager(Collections.singletonList(key)), partitionName,
IDENTITY_ENCODING)
.values().collectAsList();
ValidationUtils.checkArgument(records.size() <= 1, "Found more than 1
record for record key " + key);
return records.isEmpty() ? Option.empty() :
Option.ofNullable(records.get(0));
@@ -213,12 +213,10 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
public HoodieData<HoodieRecord<HoodieMetadataPayload>>
getRecordsByKeyPrefixes(HoodieData<String> keyPrefixes,
String partitionName,
boolean shouldLoadInMemory,
-
Option<SerializableFunctionUnchecked<String, String>> keyEncodingFn) {
+
SerializableFunctionUnchecked<String, String> keyEncodingFn) {
ValidationUtils.checkState(keyPrefixes instanceof HoodieListData,
"getRecordsByKeyPrefixes only support HoodieListData at the moment");
- // Apply key encoding if present
- List<String> sortedKeyPrefixes = keyEncodingFn.isPresent()
- ? new ArrayList<>(keyPrefixes.map(k ->
keyEncodingFn.get().apply(k)).collectAsList())
- : new ArrayList<>(keyPrefixes.collectAsList());
+ // Apply key encoding
+ List<String> sortedKeyPrefixes = new
ArrayList<>(keyPrefixes.map(keyEncodingFn::apply).collectAsList());
// Sort the prefixes so that keys are looked up in order
// Sort must come after encoding.
Collections.sort(sortedKeyPrefixes);
@@ -263,12 +261,29 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
return Predicates.startsWithAny(null, right);
}
+ /**
+ * All keys to be looked up go through the following steps:
+ * 1. [encode] escape/encode the key if needed
+ * 2. [hash to file group] compute the hash of the key to
+ * 3. [lookup within file groups] lookup the key in the file group
+ * 4. the record is returned
+ */
+ /**
+ * Performs lookup of records in the metadata table.
+ *
+ * @param keys The keys to look up in the metadata table
+ * @param partitionName The name of the metadata partition to search in
+ * @param fileSlices The list of file slices to search through
+ * @param isSecondaryIndex Whether this lookup is for a secondary index
+ * @param keyEncodingFn Optional function to encode keys before lookup
+ * @return Pair data containing the looked up records keyed by their
original keys
+ */
private HoodiePairData<String, HoodieRecord<HoodieMetadataPayload>>
doLookup(HoodieData<String> keys, String partitionName, List<FileSlice>
fileSlices,
-
boolean isSecondaryIndex, Option<SerializableFunctionUnchecked<String, String>>
keyEncodingFn) {
+
boolean isSecondaryIndex, SerializableFunctionUnchecked<String, String>
keyEncodingFn) {
final int numFileSlices = fileSlices.size();
if (numFileSlices == 1) {
- List<String> keysList = keyEncodingFn.isPresent() ? keys.map(k ->
keyEncodingFn.get().apply(k)).collectAsList() : keys.collectAsList();
+ List<String> keysList = keys.map(keyEncodingFn::apply).collectAsList();
TreeSet<String> distinctSortedKeys = new TreeSet<>(keysList);
return lookupKeyRecordPairs(partitionName, new
ArrayList<>(distinctSortedKeys), fileSlices.get(0), !isSecondaryIndex);
}
@@ -277,12 +292,14 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
// SI write path concatenates secKey$recordKey, the secKey needs extracted
for hashing;
// SI read path gives secKey only, no need for secKey extraction.
SerializableBiFunction<String, Integer, Integer> mappingFunction =
HoodieTableMetadataUtil::mapRecordKeyToFileGroupIndex;
- keys = repartitioningIfNeeded(keys, partitionName, numFileSlices,
mappingFunction);
+ keys = repartitioningIfNeeded(keys, partitionName, numFileSlices,
mappingFunction, keyEncodingFn);
HoodiePairData<Integer, String> persistedInitialPairData = keys
- // Tag key with file group index and apply key encoding
- .mapToPair(recordKey -> new ImmutablePair<>(
- mappingFunction.apply(recordKey, numFileSlices),
- keyEncodingFn.isPresent() ? keyEncodingFn.get().apply(recordKey) :
recordKey));
+ // Tag key with file group index
+ .mapToPair(recordKey -> {
+ String encodedKey = keyEncodingFn.apply(recordKey);
+ // Always encode the key before apply mapping.
+ return new ImmutablePair<>(mappingFunction.apply(encodedKey,
numFileSlices), encodedKey);
+ });
persistedInitialPairData.persist("MEMORY_AND_DISK_SER");
// Use the new processValuesOfTheSameShards API instead of explicit
rangeBasedRepartitionForEachKey
SerializableFunction<Iterator<String>, Iterator<Pair<String,
HoodieRecord<HoodieMetadataPayload>>>> processFunction =
@@ -295,7 +312,7 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
}
distinctSortedKeyIter.forEachRemaining(keysList::add);
}
- FileSlice fileSlice =
fileSlices.get(mappingFunction.apply(unescapeSpecialChars(keysList.get(0)),
numFileSlices));
+ FileSlice fileSlice =
fileSlices.get(mappingFunction.apply(keysList.get(0), numFileSlices));
return lookupKeyRecordPairsItr(partitionName, keysList, fileSlice,
!isSecondaryIndex);
};
@@ -307,12 +324,19 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
return result.filter((String k, HoodieRecord<HoodieMetadataPayload> v) ->
!v.getData().isDeleted());
}
+ /**
+ * All keys to be looked up go through the following steps:
+ * 1. [encode] escape/encode the key if needed
+ * 2. [hash to file group] compute the hash of the key to
+ * 3. [lookup within file groups] lookup the key in the file group
+ * 4. the record is returned
+ */
private HoodieData<HoodieRecord<HoodieMetadataPayload>>
doLookupIndexRecords(HoodieData<String> keys, String partitionName,
List<FileSlice> fileSlices,
-
boolean isSecondaryIndex, Option<SerializableFunctionUnchecked<String, String>>
keyEncodingFn) {
+
boolean isSecondaryIndex, SerializableFunctionUnchecked<String, String>
keyEncodingFn) {
final int numFileSlices = fileSlices.size();
if (numFileSlices == 1) {
- List<String> keysList = keyEncodingFn.isPresent() ? keys.map(k ->
keyEncodingFn.get().apply(k)).collectAsList() : keys.collectAsList();
+ List<String> keysList = keys.map(keyEncodingFn::apply).collectAsList();
TreeSet<String> distinctSortedKeys = new TreeSet<>(keysList);
return lookupRecords(partitionName, new ArrayList<>(distinctSortedKeys),
fileSlices.get(0), !isSecondaryIndex);
}
@@ -321,12 +345,14 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
// SI write path concatenates secKey$recordKey, the secKey needs extracted
for hashing;
// SI read path gives secKey only, no need for secKey extraction.
SerializableBiFunction<String, Integer, Integer> mappingFunction =
HoodieTableMetadataUtil::mapRecordKeyToFileGroupIndex;
- keys = repartitioningIfNeeded(keys, partitionName, numFileSlices,
mappingFunction);
+ keys = repartitioningIfNeeded(keys, partitionName, numFileSlices,
mappingFunction, keyEncodingFn);
HoodiePairData<Integer, String> persistedInitialPairData = keys
// Tag key with file group index
- .mapToPair(recordKey -> new ImmutablePair<>(
- mappingFunction.apply(recordKey, numFileSlices),
- keyEncodingFn.isPresent() ? keyEncodingFn.get().apply(recordKey) :
recordKey));
+ .mapToPair(recordKey -> {
+ String encodedKey = keyEncodingFn.apply(recordKey);
+ // Always encode the key before apply mapping.
+ return new ImmutablePair<>(mappingFunction.apply(encodedKey,
numFileSlices), encodedKey);
+ });
persistedInitialPairData.persist("MEMORY_AND_DISK_SER");
// Use the new processValuesOfTheSameShards API instead of explicit
rangeBasedRepartitionForEachKey
@@ -340,7 +366,7 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
}
distinctSortedKeyIter.forEachRemaining(keysList::add);
}
- FileSlice fileSlice =
fileSlices.get(mappingFunction.apply(unescapeSpecialChars(keysList.get(0)),
numFileSlices));
+ FileSlice fileSlice =
fileSlices.get(mappingFunction.apply(keysList.get(0), numFileSlices));
return lookupRecordsItr(partitionName, keysList, fileSlice,
!isSecondaryIndex);
};
List<Integer> keySpace = IntStream.range(0,
numFileSlices).boxed().collect(Collectors.toList());
@@ -367,7 +393,7 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
"Record index is not initialized in MDT");
// TODO [HUDI-9544]: Metric does not work for rdd based API due to lazy
evaluation.
- return getRecordsByKeys(recordKeys,
MetadataPartitionType.RECORD_INDEX.getPartitionPath(), Option.empty())
+ return getRecordsByKeys(recordKeys,
MetadataPartitionType.RECORD_INDEX.getPartitionPath(), IDENTITY_ENCODING)
.mapToPair((Pair<String, HoodieRecord<HoodieMetadataPayload>> p) ->
Pair.of(p.getLeft(), p.getRight().getData().getRecordGlobalLocation()));
}
@@ -386,7 +412,7 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
// The caller is required to check for record index existence in MDT
before calling this method.
ValidationUtils.checkState(dataMetaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.RECORD_INDEX),
"Record index is not initialized in MDT");
- return readIndexRecords(recordKeys,
MetadataPartitionType.RECORD_INDEX.getPartitionPath(), Option.empty())
+ return readIndexRecords(recordKeys,
MetadataPartitionType.RECORD_INDEX.getPartitionPath(), IDENTITY_ENCODING)
.map(r -> r.getData().getRecordGlobalLocation());
}
@@ -432,7 +458,7 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
@Override
public HoodiePairData<String, HoodieRecord<HoodieMetadataPayload>>
getRecordsByKeys(
- HoodieData<String> keys, String partitionName,
Option<SerializableFunctionUnchecked<String, String>> keyEncodingFn) {
+ HoodieData<String> keys, String partitionName,
SerializableFunctionUnchecked<String, String> keyEncodingFn) {
List<FileSlice> fileSlices =
partitionFileSliceMap.computeIfAbsent(partitionName,
k ->
HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient,
getMetadataFileSystemView(), partitionName));
checkState(!fileSlices.isEmpty(), "No file slices found for partition: " +
partitionName);
@@ -442,7 +468,7 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
}
public HoodieData<String>
getRecordKeysFromSecondaryKeysV2(HoodieData<String> secondaryKeys, String
partitionName) {
- return readIndexRecords(secondaryKeys, partitionName,
Option.of(SecondaryIndexKeyUtils::escapeSpecialChars)).map(
+ return readIndexRecords(secondaryKeys, partitionName,
SecondaryIndexKeyUtils::escapeSpecialChars).map(
hoodieRecord ->
SecondaryIndexKeyUtils.getRecordKeyFromSecondaryIndexKey(hoodieRecord.getRecordKey()));
}
@@ -469,7 +495,7 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
protected HoodieData<HoodieRecord<HoodieMetadataPayload>>
readIndexRecords(HoodieData<String> keys,
String partitionName,
-
Option<SerializableFunctionUnchecked<String, String>> keyEncodingFn) {
+
SerializableFunctionUnchecked<String, String> keyEncodingFn) {
List<FileSlice> fileSlices =
partitionFileSliceMap.computeIfAbsent(partitionName,
k ->
HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient,
getMetadataFileSystemView(), partitionName));
checkState(!fileSlices.isEmpty(), "No file slices found for partition: " +
partitionName);
@@ -481,16 +507,18 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
// When testing we noticed that the parallelism can be very low which hurts
the performance. so we should start with a reasonable
// level of parallelism in that case.
private HoodieData<String> repartitioningIfNeeded(
- HoodieData<String> keys, String partitionName, int numFileSlices,
SerializableBiFunction<String, Integer, Integer> mappingFunction) {
+ HoodieData<String> keys, String partitionName, int numFileSlices,
SerializableBiFunction<String, Integer, Integer> mappingFunction,
+ SerializableFunctionUnchecked<String, String> keyEncodingFn) {
if (keys instanceof HoodieListData) {
- int parallelism = (int) keys.map(k -> mappingFunction.apply(k,
numFileSlices)).distinct().count();
+ int parallelism;
+ parallelism = (int) keys.map(k ->
mappingFunction.apply(keyEncodingFn.apply(k),
numFileSlices)).distinct().count();
// In case of empty lookup set, we should avoid RDD with 0 partitions.
parallelism = Math.max(parallelism, 1);
- LOG.info("getRecordFast repartition HoodieListData to JavaRDD: exit,
partitionName {}, num partitions: {}",
+ LOG.info("Repartitioning keys for partition {} from list data with
parallelism: {}",
partitionName, parallelism);
keys = getEngineContext().parallelize(keys.collectAsList(), parallelism);
} else if (keys.getNumPartitions() <
metadataConfig.getRepartitionMinPartitionsThreshold()) {
- LOG.info("getRecordFast repartition HoodieNonListData. partitionName {},
num partitions: {}", partitionName,
metadataConfig.getRepartitionDefaultPartitions());
+ LOG.info("Repartitioning keys for partition {} to {} partitions",
partitionName, metadataConfig.getRepartitionDefaultPartitions());
keys =
keys.repartition(metadataConfig.getRepartitionDefaultPartitions());
}
return keys;
@@ -732,7 +760,7 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
return HoodieListPairData.eager(Collections.emptyList());
}
- Map<String, Set<String>> res = getRecordsByKeyPrefixes(keys,
partitionName, false, Option.of(SecondaryIndexKeyUtils::escapeSpecialChars))
+ Map<String, Set<String>> res = getRecordsByKeyPrefixes(keys,
partitionName, false, SecondaryIndexKeyUtils::escapeSpecialChars)
.map(record -> {
if (!record.getData().isDeleted()) {
return
SecondaryIndexKeyUtils.getSecondaryKeyRecordKeyPair(record.getRecordKey());
@@ -761,7 +789,7 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
if (secondaryKeys.isEmpty()) {
return HoodieListPairData.eager(Collections.emptyList());
}
- return readIndexRecords(secondaryKeys, partitionName,
Option.of(SecondaryIndexKeyUtils::escapeSpecialChars))
+ return readIndexRecords(secondaryKeys, partitionName,
SecondaryIndexKeyUtils::escapeSpecialChars)
.filter(hoodieRecord -> !hoodieRecord.getData().isDeleted())
.mapToPair(hoodieRecord ->
SecondaryIndexKeyUtils.getRecordKeySecondaryKeyPair(hoodieRecord.getRecordKey()))
.groupByKey()
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
index 1e6e5c466319..8035d31af5c2 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
@@ -265,7 +265,7 @@ public interface HoodieTableMetadata extends Serializable,
AutoCloseable {
HoodieData<HoodieRecord<HoodieMetadataPayload>>
getRecordsByKeyPrefixes(HoodieData<String> keyPrefixes,
String partitionName,
boolean shouldLoadInMemory,
-
Option<SerializableFunctionUnchecked<String, String>> keyEncoder);
+
SerializableFunctionUnchecked<String, String> keyEncoder);
/**
* Get the instant time to which the metadata is synced w.r.t data timeline.
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 568c54975342..535dedf148b1 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -55,6 +55,7 @@ import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.engine.ReaderContextFactory;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.function.SerializableBiFunction;
+import org.apache.hudi.common.function.SerializableFunctionUnchecked;
import org.apache.hudi.common.function.SerializablePairFunction;
import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
import org.apache.hudi.common.model.FileSlice;
@@ -210,6 +211,7 @@ public class HoodieTableMetadataUtil {
public static final String PARTITION_NAME_EXPRESSION_INDEX_PREFIX =
"expr_index_";
public static final String PARTITION_NAME_SECONDARY_INDEX =
"secondary_index";
public static final String PARTITION_NAME_SECONDARY_INDEX_PREFIX =
"secondary_index_";
+ public static final SerializableFunctionUnchecked<String, String>
IDENTITY_ENCODING = key -> key;
private static final Set<Schema.Type> SUPPORTED_TYPES_PARTITION_STATS = new
HashSet<>(Arrays.asList(
Schema.Type.INT, Schema.Type.LONG, Schema.Type.FLOAT,
Schema.Type.DOUBLE, Schema.Type.STRING, Schema.Type.BOOLEAN, Schema.Type.NULL,
Schema.Type.BYTES));
@@ -1343,7 +1345,7 @@ public class HoodieTableMetadataUtil {
* (either all containing the secondary index separator or none containing
it).
* <p>
* For secondary index partitions (version >= 2), if the record keys contain
the secondary index separator,
- * the secondary key portion is used for hashing. Otherwise, the full record
key is used.
+ * the unescaped secondary key portion is used for hashing. Otherwise, the
full record key is used.
*
* @param needsSecondaryKeyExtraction Whether to extract secondary key from
composite keys (should be determined by caller)
*
@@ -1352,7 +1354,7 @@ public class HoodieTableMetadataUtil {
public static SerializableBiFunction<String, Integer, Integer>
getSecondaryKeyToFileGroupMappingFunction(boolean needsSecondaryKeyExtraction) {
if (needsSecondaryKeyExtraction) {
return (recordKey, numFileGroups) -> {
- String secondaryKey =
SecondaryIndexKeyUtils.getSecondaryKeyFromSecondaryIndexKey(recordKey);
+ String secondaryKey =
SecondaryIndexKeyUtils.getUnescapedSecondaryKeyFromSecondaryIndexKey(recordKey);
return mapRecordKeyToFileGroupIndex(secondaryKey, numFileGroups);
};
}
@@ -2716,7 +2718,7 @@ public class HoodieTableMetadataUtil {
// Fetch metadata table COLUMN_STATS partition records for above
files
List<HoodieColumnRangeMetadata<Comparable>> partitionColumnMetadata
= tableMetadata
.getRecordsByKeyPrefixes(
- HoodieListData.lazy(generateKeyPrefixes(colsToIndex,
partitionName)), MetadataPartitionType.COLUMN_STATS.getPartitionPath(), false,
Option.empty())
+ HoodieListData.lazy(generateKeyPrefixes(colsToIndex,
partitionName)), MetadataPartitionType.COLUMN_STATS.getPartitionPath(), false,
IDENTITY_ENCODING)
// schema and properties are ignored in getInsertValue, so
simply pass as null
.map(record ->
((HoodieMetadataPayload)record.getData()).getColumnStatMetadata())
.filter(Option::isPresent)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexKeyUtils.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexKeyUtils.java
index 19a3bded29b1..e6c759b5fde1 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexKeyUtils.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexKeyUtils.java
@@ -53,10 +53,16 @@ public class SecondaryIndexKeyUtils {
}
public static String getSecondaryKeyFromSecondaryIndexKey(String key) {
+ // the payload key is in the format of "secondaryKey$primaryKey"
+ // we need to extract the secondary key from the payload key
+ return
unescapeSpecialChars(getUnescapedSecondaryKeyFromSecondaryIndexKey(key));
+ }
+
+ public static String getUnescapedSecondaryKeyFromSecondaryIndexKey(String
key) {
// the payload key is in the format of "secondaryKey$primaryKey"
// we need to extract the secondary key from the payload key
int delimiterIndex = getSecondaryIndexKeySeparatorPosition(key);
- return unescapeSpecialChars(key.substring(0, delimiterIndex));
+ return key.substring(0, delimiterIndex);
}
public static String constructSecondaryIndexKey(String secondaryKey, String
recordKey) {
diff --git
a/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
b/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
index e7f50dbb820e..5900fdafbdde 100644
---
a/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
+++
b/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
@@ -22,31 +22,39 @@ import
org.apache.hudi.common.function.SerializableBiFunction;
import org.junit.jupiter.api.Test;
+import static
org.apache.hudi.metadata.SecondaryIndexKeyUtils.constructSecondaryIndexKey;
+import static
org.apache.hudi.metadata.SecondaryIndexKeyUtils.escapeSpecialChars;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
public class TestHoodieTableMetadataUtil {
@Test
- public void testGetRecordKeyToFileGroupIndexFunctionOptimized() {
+ public void testGetRecordKeyToFileGroupIndexFunction() {
// Test with secondary key format
String compositeKey = "secondaryKey$recordKey";
- SerializableBiFunction<String, Integer, Integer> optimizedFunction =
+ SerializableBiFunction<String, Integer, Integer> hashOnSecKeyOnly =
HoodieTableMetadataUtil.getSecondaryKeyToFileGroupMappingFunction(true);
+ SerializableBiFunction<String, Integer, Integer> hashOnFullKey =
+
HoodieTableMetadataUtil.getSecondaryKeyToFileGroupMappingFunction(false);
- int result1 = optimizedFunction.apply(compositeKey, 10);
- int result2 =
optimizedFunction.apply("anotherSecondaryKey$anotherRecordKey", 10);
+ int result1 = hashOnSecKeyOnly.apply(compositeKey, 10);
+ int result2 =
hashOnSecKeyOnly.apply("anotherSecondaryKey$anotherRecordKey", 10);
// Both should hash the secondary key portion
assertNotEquals(result1, result2);
// Test with regular key format
- optimizedFunction =
HoodieTableMetadataUtil.getSecondaryKeyToFileGroupMappingFunction(false);
-
- int result3 = optimizedFunction.apply("simpleKey", 10);
- int result4 = optimizedFunction.apply("anotherSimpleKey", 10);
+ int result3 = hashOnFullKey.apply("simpleKey", 10);
+ int result4 = hashOnFullKey.apply("anotherSimpleKey", 10);
// Both should hash the full key
assertNotEquals(result3, result4);
+
+ // Hash on Sec key only <=> Hash full key if key equals to UNESCAPED sec
key
+ assertEquals(hashOnSecKeyOnly.apply("secKey$recKey", 10),
hashOnFullKey.apply("secKey", 10));
+ assertEquals(hashOnSecKeyOnly.apply(constructSecondaryIndexKey("seckey",
"reckey"), 10), hashOnFullKey.apply("seckey", 10));
+ assertEquals(hashOnSecKeyOnly.apply(constructSecondaryIndexKey("$",
"reckey"), 10), hashOnFullKey.apply(escapeSpecialChars("$"), 10));
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/FileStatsIndex.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/FileStatsIndex.java
index 073491639cfc..0c1c296feab4 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/FileStatsIndex.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/FileStatsIndex.java
@@ -24,7 +24,6 @@ import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.common.util.collection.Pair;
@@ -64,6 +63,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.IDENTITY_ENCODING;
import static
org.apache.hudi.source.stats.ColumnStatsSchemas.COL_STATS_DATA_TYPE;
import static
org.apache.hudi.source.stats.ColumnStatsSchemas.COL_STATS_TARGET_POS;
import static
org.apache.hudi.source.stats.ColumnStatsSchemas.METADATA_DATA_TYPE;
@@ -397,7 +397,7 @@ public class FileStatsIndex implements ColumnStatsIndex {
HoodieData<HoodieRecord<HoodieMetadataPayload>> records =
getMetadataTable().getRecordsByKeyPrefixes(
- HoodieListData.lazy(encodedTargetColumnNames),
getIndexPartitionName(), false, Option.empty());
+ HoodieListData.lazy(encodedTargetColumnNames),
getIndexPartitionName(), false, IDENTITY_ENCODING);
org.apache.hudi.util.AvroToRowDataConverters.AvroToRowDataConverter
converter =
AvroToRowDataConverters.createRowConverter((RowType)
METADATA_DATA_TYPE.getLogicalType());
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
index 60d4df93c18f..204a50ca2a03 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
@@ -358,7 +358,7 @@ class ColumnStatsIndexSupport(spark: SparkSession,
val metadataRecords: HoodieData[HoodieRecord[HoodieMetadataPayload]] =
metadataTable.getRecordsByKeyPrefixes(
HoodieListData.eager(keyPrefixes.toSeq.asJava),
HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS, shouldReadInMemory,
- toJavaOption(Option.empty[SerializableFunctionUnchecked[String,
String]]))
+ HoodieTableMetadataUtil.IDENTITY_ENCODING)
val columnStatsRecords: HoodieData[HoodieMetadataColumnStats] =
//TODO: [HUDI-8303] Explicit conversion might not be required for Scala
2.12+
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ExpressionIndexSupport.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ExpressionIndexSupport.scala
index 0dab9f6b3f65..f81bda62ac38 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ExpressionIndexSupport.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ExpressionIndexSupport.scala
@@ -360,7 +360,7 @@ class ExpressionIndexSupport(spark: SparkSession,
val metadataRecords: HoodieData[HoodieRecord[HoodieMetadataPayload]] =
metadataTable.getRecordsByKeyPrefixes(
HoodieListData.eager(keyPrefixes.toSeq.asJava),
HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS, shouldReadInMemory,
- toJavaOption(Option.empty[SerializableFunctionUnchecked[String,
String]]))
+ HoodieTableMetadataUtil.IDENTITY_ENCODING)
val columnStatsRecords: HoodieData[HoodieMetadataColumnStats] =
//TODO: [HUDI-8303] Explicit conversion might not be required for Scala
2.12+
@@ -564,7 +564,7 @@ class ExpressionIndexSupport(spark: SparkSession,
val metadataRecords: HoodieData[HoodieRecord[HoodieMetadataPayload]] =
metadataTable.getRecordsByKeyPrefixes(
HoodieListData.eager(keyPrefixes.toSeq.asJava), indexPartition,
shouldReadInMemory,
- toJavaOption(Option.empty[SerializableFunctionUnchecked[String,
String]]))
+ HoodieTableMetadataUtil.IDENTITY_ENCODING)
val columnStatsRecords: HoodieData[HoodieMetadataColumnStats] =
//TODO: [HUDI-8303] Explicit conversion might not be required for Scala
2.12+
metadataRecords.map(JFunction.toJavaSerializableFunction(record => {
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionStatsIndexSupport.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionStatsIndexSupport.scala
index f00cd9c20f48..5b5fcfeb12d4 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionStatsIndexSupport.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionStatsIndexSupport.scala
@@ -73,7 +73,7 @@ class PartitionStatsIndexSupport(spark: SparkSession,
val metadataRecords: HoodieData[HoodieRecord[HoodieMetadataPayload]] =
metadataTable.getRecordsByKeyPrefixes(
HoodieListData.eager(encodedTargetColumnNames.asJava),
HoodieTableMetadataUtil.PARTITION_NAME_PARTITION_STATS, shouldReadInMemory,
- toJavaOption(Option.empty[SerializableFunctionUnchecked[String,
String]]))
+ HoodieTableMetadataUtil.IDENTITY_ENCODING)
val columnStatsRecords: HoodieData[HoodieMetadataColumnStats] =
//TODO: [HUDI-8303] Explicit conversion might not be required for Scala
2.12+
metadataRecords.map(JFunction.toJavaSerializableFunction(record => {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
index 8f3492e75189..3d0661b5e22a 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
@@ -193,6 +193,7 @@ import static
org.apache.hudi.config.HoodieCompactionConfig.INLINE_COMPACT_NUM_D
import static
org.apache.hudi.io.storage.HoodieSparkIOFactory.getHoodieSparkIOFactory;
import static
org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP;
import static
org.apache.hudi.metadata.HoodieTableMetadata.getMetadataTableBasePath;
+import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.IDENTITY_ENCODING;
import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.deleteMetadataTable;
import static org.apache.hudi.metadata.MetadataPartitionType.BLOOM_FILTERS;
import static org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS;
@@ -1999,7 +2000,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
ColumnIndexID columnIndexID = new
ColumnIndexID(HoodieRecord.RECORD_KEY_METADATA_FIELD);
List<HoodieRecord<HoodieMetadataPayload>> result =
tableMetadata.getRecordsByKeyPrefixes(
HoodieListData.lazy(Collections.singletonList(columnIndexID.asBase64EncodedString())),
- MetadataPartitionType.COLUMN_STATS.getPartitionPath(), true,
Option.empty()).collectAsList();
+ MetadataPartitionType.COLUMN_STATS.getPartitionPath(), true,
IDENTITY_ENCODING).collectAsList();
// there are 3 partitions in total and 2 commits. total entries should
be 6.
assertEquals(result.size(), 6);
@@ -2008,7 +2009,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
PartitionIndexID partitionIndexID = new
PartitionIndexID(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);
result = tableMetadata.getRecordsByKeyPrefixes(
HoodieListData.lazy(Collections.singletonList(columnIndexID.asBase64EncodedString().concat(partitionIndexID.asBase64EncodedString()))),
- MetadataPartitionType.COLUMN_STATS.getPartitionPath(), true,
Option.empty()).collectAsList();
+ MetadataPartitionType.COLUMN_STATS.getPartitionPath(), true,
IDENTITY_ENCODING).collectAsList();
// 1 partition and 2 commits. total entries should be 2.
assertEquals(result.size(), 2);
result.forEach(entry -> {
@@ -2027,7 +2028,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
columnIndexID = new
ColumnIndexID(HoodieRecord.COMMIT_TIME_METADATA_FIELD);
result = tableMetadata.getRecordsByKeyPrefixes(
HoodieListData.lazy(Collections.singletonList(columnIndexID.asBase64EncodedString().concat(partitionIndexID.asBase64EncodedString()))),
- MetadataPartitionType.COLUMN_STATS.getPartitionPath(), true,
Option.empty()).collectAsList();
+ MetadataPartitionType.COLUMN_STATS.getPartitionPath(), true,
IDENTITY_ENCODING).collectAsList();
// 1 partition and 2 commits. total entries should be 2.
assertEquals(result.size(), 2);
@@ -3016,7 +3017,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
HoodieTableMetadata tableMetadata = metadata(client, storage);
assertTrue(tableMetadata.getRecordsByKeyPrefixes(
HoodieListData.lazy(Collections.singletonList(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)),
- FILES.getPartitionPath(), false, Option.empty()).isEmpty());
+ FILES.getPartitionPath(), false, IDENTITY_ENCODING).isEmpty());
assertTrue(tableMetadata.getAllPartitionPaths().contains(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH));
assertFalse(tableMetadata.getAllPartitionPaths().contains(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH));
// above upsert would have triggered clean