Davis-Zhang-Onehouse commented on code in PR #13489:
URL: https://github.com/apache/hudi/pull/13489#discussion_r2178371643
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -277,61 +268,296 @@ private Predicate
transformKeyPrefixesToPredicate(List<String> keyPrefixes) {
return Predicates.startsWithAny(null, right);
}
- @Override
- protected Map<String, HoodieRecord<HoodieMetadataPayload>>
getRecordsByKeys(List<String> keys, String partitionName) {
- if (keys.isEmpty()) {
- return Collections.emptyMap();
+ private HoodiePairData<String, HoodieRecord<HoodieMetadataPayload>>
doLookupWithMapping(
+ HoodieData<String> keys, String partitionName, List<FileSlice>
fileSlices, boolean isSecondaryIndex,
+ Option<SerializableFunction<String, String>> keyEncodingFn) {
+
+ final int numFileSlices = fileSlices.size();
+ if (numFileSlices == 1) {
+ TreeSet<String> distinctSortedKeys = new TreeSet<>(keys.collectAsList());
+ return lookupRecordsWithMapping(partitionName, new
ArrayList<>(distinctSortedKeys), fileSlices.get(0), !isSecondaryIndex,
keyEncodingFn);
}
- Map<String, HoodieRecord<HoodieMetadataPayload>> result;
+ HoodieIndexVersion indexVersion =
existingIndexVersionOrDefault(partitionName, metadataMetaClient);
+ SerializableBiFunction<String, Integer, Integer> mappingFunction =
+
HoodieTableMetadataUtil.getRecordKeyToFileGroupIndexFunction(partitionName,
indexVersion, false);
+ keys = repartitioningIfNeeded(keys, partitionName, numFileSlices,
indexVersion);
+ if (keys instanceof HoodieListData) {
+ Map<String, HoodieRecord<HoodieMetadataPayload>> result = new
HashMap<>();
+ List<String> keyList = keys.collectAsList();
+ ArrayList<ArrayList<String>> partitionedKeys =
partitionKeysByFileSlices(keyList, numFileSlices, partitionName, indexVersion);
+ List<HoodiePairData<String, HoodieRecord<HoodieMetadataPayload>>>
partialResults =
+ getEngineContext().map(partitionedKeys, keysList -> {
+ if (keysList.isEmpty()) {
+ return HoodieListPairData.eager(Collections.emptyList());
+ }
+ int shardIndex = mappingFunction.apply(keysList.get(0),
numFileSlices);
+ Collections.sort(keysList);
+ return lookupRecordsWithMapping(partitionName, keysList,
fileSlices.get(shardIndex), !isSecondaryIndex, keyEncodingFn);
+ }, partitionedKeys.size());
+
+ partialResults.stream()
+ .flatMap(p -> p.collectAsList().stream())
+ .forEach(pair -> result.put(pair.getKey(), pair.getValue()));
+ return HoodieDataUtils.eagerMapKV(result)
+ .filter((String k, HoodieRecord<HoodieMetadataPayload> v) ->
!v.getData().isDeleted());
+ } else {
+ keys = adaptiveSortDedupRepartition(keys, partitionName, numFileSlices,
indexVersion, metadataConfig);
+ return keys.mapPartitions(iter -> {
+ if (!iter.hasNext()) {
+ return Collections.emptyIterator();
+ }
+ List<String> sortedKeys = new ArrayList<>();
+ iter.forEachRemaining(sortedKeys::add);
+ FileSlice fileSlice =
fileSlices.get(mappingFunction.apply(sortedKeys.get(0), numFileSlices));
+ return lookupRecordsWithMappingIter(partitionName, sortedKeys,
fileSlice, !isSecondaryIndex, keyEncodingFn);
+ }, false)
+ .mapToPair(e -> new ImmutablePair<>(e.getKey(), e.getValue()))
+ .filter((String k, HoodieRecord<HoodieMetadataPayload> v) ->
!v.getData().isDeleted());
+ }
+ }
- // Load the file slices for the partition. Each file slice is a shard
which saves a portion of the keys.
- List<FileSlice> partitionFileSlices =
partitionFileSliceMap.computeIfAbsent(partitionName,
- k ->
HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient,
getMetadataFileSystemView(), partitionName));
- final int numFileSlices = partitionFileSlices.size();
- checkState(numFileSlices > 0, "Number of file slices for partition " +
partitionName + " should be > 0");
+ private HoodieData<HoodieRecord<HoodieMetadataPayload>>
doLookupWithoutMapping(
+ HoodieData<String> keys, String partitionName, List<FileSlice>
fileSlices, boolean isSecondaryIndex,
+ Option<SerializableFunction<String, String>> keyEncodingFn) {
- // Lookup keys from each file slice
+ final int numFileSlices = fileSlices.size();
if (numFileSlices == 1) {
- // Optimization for a single slice for smaller metadata table partitions
- result = lookupKeys(partitionName, keys, partitionFileSlices.get(0));
+ TreeSet<String> distinctSortedKeys = new TreeSet<>(keys.collectAsList());
+ return lookupRecordsWithoutMapping(partitionName, new
ArrayList<>(distinctSortedKeys), fileSlices.get(0), !isSecondaryIndex,
keyEncodingFn);
+ }
+
+ HoodieIndexVersion indexVersion =
existingIndexVersionOrDefault(partitionName, metadataMetaClient);
Review Comment:
nice catch, even functional test with exhaustive validation didn't catch
this.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]