codope commented on code in PR #5208:
URL: https://github.com/apache/hudi/pull/5208#discussion_r843747665
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -133,28 +142,79 @@ private void initIfNeeded() {
}
@Override
- protected List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>>
getRecordsByKeys(List<String> keys,
-
String partitionName) {
+ public HoodieData<HoodieRecord<HoodieMetadataPayload>>
getRecordsByKeyPrefixes(List<String> keyPrefixes,
+
String partitionName) {
+ // NOTE: Since we partition records to a particular file-group by full
key, we will have
+ // to scan all file-groups for all key-prefixes as each of these
might contain some
+ // records matching the key-prefix
+ List<FileSlice> partitionFileSlices =
+
HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient,
partitionName);
+
+ return engineContext.parallelize(partitionFileSlices)
+ .flatMap(
+ (SerializableFunction<FileSlice, Iterator<Pair<String,
Option<HoodieRecord<HoodieMetadataPayload>>>>>) fileSlice -> {
+ // NOTE: Since this will be executed by executors, we can't
access previously cached
+ // readers, and therefore have to always open new ones
+ Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader>
readers =
+ openReaders(partitionName, fileSlice);
+ try {
+ List<Long> timings = new ArrayList<>();
+
+ HoodieFileReader baseFileReader = readers.getKey();
+ HoodieMetadataMergedLogRecordReader logRecordScanner =
readers.getRight();
+
+ if (baseFileReader == null && logRecordScanner == null) {
+ // TODO: what do we do if both does not exist? should we
throw an exception and let caller do the fallback ?
+ return Collections.emptyIterator();
+ }
+
+ boolean fullKeys = false;
+
+ Map<String, Option<HoodieRecord<HoodieMetadataPayload>>>
logRecords =
+ readLogRecords(logRecordScanner, keyPrefixes, fullKeys,
timings);
+
+ List<Pair<String,
Option<HoodieRecord<HoodieMetadataPayload>>>> mergedRecords =
+ readFromBaseAndMergeWithLogRecords(baseFileReader,
keyPrefixes, fullKeys, logRecords, timings, partitionName);
+
+ LOG.debug(String.format("Metadata read for %s keys took
[baseFileRead, logMerge] %s ms",
+ keyPrefixes.size(), timings));
+
+ return mergedRecords.iterator();
+ } catch (IOException ioe) {
+ throw new HoodieIOException("Error merging records from
metadata table for " + keyPrefixes.size() + " key : ", ioe);
+ } finally {
+ close(Pair.of(partitionName, fileSlice.getFileId()));
Review Comment:
that's right.. i've made the change.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]