linliu-code commented on code in PR #13300:
URL: https://github.com/apache/hudi/pull/13300#discussion_r2114196157
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -292,36 +319,49 @@ private static ArrayList<ArrayList<String>>
partitionKeysByFileSlices(List<Strin
return partitionedKeys;
}
- /**
- * Lookup list of keys from a single file slice.
- *
- * @param partitionName Name of the partition
- * @param keys The list of keys to lookup
- * @param fileSlice The file slice to read
- * @return A {@code Map} of key name to {@code HoodieRecord} for the keys
which were found in the file slice
- */
- private Map<String, HoodieRecord<HoodieMetadataPayload>>
lookupKeysFromFileSlice(String partitionName, List<String> keys, FileSlice
fileSlice) {
- Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader> readers =
getOrCreateReaders(partitionName, fileSlice);
+ private Map<String, HoodieRecord<HoodieMetadataPayload>>
lookupKeysWithFileGroupReader(String partitionName,
+
List<String> keys,
+
FileSlice fileSlice) {
try {
- HoodieSeekingFileReader<?> baseFileReader = readers.getKey();
- HoodieMetadataLogRecordReader logRecordScanner = readers.getRight();
- if (baseFileReader == null && logRecordScanner == null) {
- return Collections.emptyMap();
- }
-
// Sort it here once so that we don't need to sort individually for base
file and for each individual log files.
List<String> sortedKeys = new ArrayList<>(keys);
+ // So we use the natural order to sort.
Collections.sort(sortedKeys);
- boolean fullKeys = true;
- List<Long> timings = new ArrayList<>(1);
- Map<String, HoodieRecord<HoodieMetadataPayload>> logRecords =
readLogRecords(logRecordScanner, sortedKeys, fullKeys, timings);
- return readFromBaseAndMergeWithLogRecords(baseFileReader, sortedKeys,
fullKeys, logRecords, timings, partitionName);
- } catch (IOException ioe) {
- throw new HoodieIOException("Error merging records from metadata table
for " + keys.size() + " key : ", ioe);
- } finally {
- if (!reuse) {
- closeReader(readers);
+ Option<HoodieInstant> latestMetadataInstant =
+
metadataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant();
+ String latestMetadataInstantTime =
+
latestMetadataInstant.map(HoodieInstant::requestedTime).orElse(SOLO_COMMIT_TIMESTAMP);
+ Schema schema =
HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema());
+ // Only those log files which have a corresponding completed instant on
the dataset should be read
+ // This is because the metadata table is updated before the dataset
instants are committed.
+ Set<String> validInstantTimestamps = getValidInstantTimestamps();
+ InstantRange instantRange = InstantRange.builder()
+ .rangeType(InstantRange.RangeType.EXACT_MATCH)
+ .explicitInstants(validInstantTimestamps).build();
+
+ HoodieFileGroupReader<IndexedRecord> fileGroupReader =
getFileGroupReader(
Review Comment:
Done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]