hudi-bot opened a new issue, #17063:
URL: https://github.com/apache/hudi/issues/17063
as of today all the places that involves map partitions underneath, the
JavaRDD and HoodieListData/HoodieListPairData deviates.
We need to have map partitions capability for the 2 classes. We also need
partitioner interface and the same idea as RDD on how partitioning works. Maybe
we also need to abstract the partitioner.
We already have code that requires Map partition in index lookup and today
we do hacky things to achieve it
{code:java}
@Override
protected Map<String, HoodieRecord<HoodieMetadataPayload>>
getRecordsByKeys(List<String> keys, String partitionName) {
if (keys.isEmpty()) {
return Collections.emptyMap();
}
Map<String, HoodieRecord<HoodieMetadataPayload>> result;
// Load the file slices for the partition. Each file slice is a shard
which saves a portion of the keys.
List<FileSlice> partitionFileSlices =
partitionFileSliceMap.computeIfAbsent(partitionName,
k ->
HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient,
getMetadataFileSystemView(), partitionName));
final int numFileSlices = partitionFileSlices.size();
checkState(numFileSlices > 0, "Number of file slices for partition " +
partitionName + " should be > 0");
// Lookup keys from each file slice
if (numFileSlices == 1) {
// Optimization for a single slice for smaller metadata table partitions
result = lookupKeys(partitionName, keys, partitionFileSlices.get(0));
} else {
// Parallel lookup for large sized partitions with many file slices
// Partition the keys by the file slice which contains it
ArrayList<ArrayList<String>> partitionedKeys =
partitionKeysByFileSlices(keys, numFileSlices); <---- We do partition by in
MetadataTable Class
result = new HashMap<>(keys.size());
getEngineContext().setJobStatus(this.getClass().getSimpleName(),
"Reading keys from metadata table partition " + partitionName);
getEngineContext().map(partitionedKeys, keysList -> {
if (keysList.isEmpty()) {
return Collections.<String,
HoodieRecord<HoodieMetadataPayload>>emptyMap();
}
int shardIndex =
HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(keysList.get(0),
numFileSlices);
return lookupKeys(partitionName, keysList,
partitionFileSlices.get(shardIndex));
}, partitionedKeys.size()).forEach(result::putAll);
}
return result;
}
private static ArrayList<ArrayList<String>>
partitionKeysByFileSlices(List<String> keys, int numFileSlices) {
ArrayList<ArrayList<String>> partitionedKeys = new
ArrayList<>(numFileSlices);
for (int i = 0; i < numFileSlices; ++i) {
partitionedKeys.add(new ArrayList<>());
}
keys.forEach(key -> {
int shardIndex =
HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(key, numFileSlices);
partitionedKeys.get(shardIndex).add(key);
});
return partitionedKeys;
} {code}
## JIRA info
- Link: https://issues.apache.org/jira/browse/HUDI-9542
- Type: Bug
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]