manojpec commented on a change in pull request #4352:
URL: https://github.com/apache/hudi/pull/4352#discussion_r796898658
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
##########
@@ -125,30 +128,43 @@ private void initIfNeeded() {
return recordsByKeys.size() == 0 ? Option.empty() :
recordsByKeys.get(0).getValue();
}
- protected List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>>
getRecordsByKeys(List<String> keys, String partitionName) {
- Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> readers =
openReadersIfNeeded(keys.get(0), partitionName);
- try {
- List<Long> timings = new ArrayList<>();
- HoodieFileReader baseFileReader = readers.getKey();
- HoodieMetadataMergedLogRecordReader logRecordScanner =
readers.getRight();
+ @Override
+ protected List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>>
getRecordsByKeys(List<String> keys,
+
String partitionName) {
+ Map<Pair<String, FileSlice>, List<String>> partitionFileSliceToKeysMap =
getPartitionFileSlices(partitionName, keys);
+ List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> result =
new ArrayList<>();
+ AtomicInteger fileSlicesKeysCount = new AtomicInteger();
+ partitionFileSliceToKeysMap.forEach((partitionFileSlicePair,
fileSliceKeys) -> {
+ Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> readers =
openReadersIfNeeded(partitionName,
+ partitionFileSlicePair.getRight());
+ try {
+ List<Long> timings = new ArrayList<>();
+ HoodieFileReader baseFileReader = readers.getKey();
+ HoodieMetadataMergedLogRecordReader logRecordScanner =
readers.getRight();
- if (baseFileReader == null && logRecordScanner == null) {
- return Collections.emptyList();
- }
+ if (baseFileReader == null && logRecordScanner == null) {
+ return;
+ }
- // local map to assist in merging with base file records
- Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords =
readLogRecords(logRecordScanner, keys, timings);
- List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> result =
readFromBaseAndMergeWithLogRecords(
- baseFileReader, keys, logRecords, timings, partitionName);
- LOG.info(String.format("Metadata read for %s keys took [baseFileRead,
logMerge] %s ms", keys.size(), timings));
- return result;
- } catch (IOException ioe) {
- throw new HoodieIOException("Error merging records from metadata table
for " + keys.size() + " key : ", ioe);
- } finally {
- if (!reuse) {
- close(partitionName);
+ // local map to assist in merging with base file records
+ Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords =
readLogRecords(logRecordScanner,
+ fileSliceKeys, timings);
+ result.addAll(readFromBaseAndMergeWithLogRecords(baseFileReader,
fileSliceKeys, logRecords,
+ timings, partitionName));
+ LOG.debug(String.format("Metadata read for %s keys took [baseFileRead,
logMerge] %s ms",
+ fileSliceKeys.size(), timings));
+ fileSlicesKeysCount.addAndGet(fileSliceKeys.size());
+ } catch (IOException ioe) {
+ throw new HoodieIOException("Error merging records from metadata table
for " + keys.size() + " key : ", ioe);
+ } finally {
+ if (!reuse) {
+ close(Pair.of(partitionFileSlicePair.getLeft(),
partitionFileSlicePair.getRight().getFileId()));
+ }
}
- }
+ });
+
+ ValidationUtils.checkState(keys.size() == fileSlicesKeysCount.get());
Review comment:
When we discussed this I said we do return empty when not present. I
take it back. Here we are finding the key to fileslice mapping. That is,
HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex() which will always return
the file slice it belongs to. So, the validation at line 166 holds good here.
We don't return partial keys or empty from the previous calls.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]