the-other-tim-brown commented on code in PR #13300:
URL: https://github.com/apache/hudi/pull/13300#discussion_r2109194158
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -210,38 +222,83 @@ public HoodieData<HoodieRecord<HoodieMetadataPayload>>
getRecordsByKeyPrefixes(L
getEngineContext().parallelize(partitionFileSlices))
.flatMap(
(SerializableFunction<FileSlice,
Iterator<HoodieRecord<HoodieMetadataPayload>>>) fileSlice -> {
- // NOTE: Since this will be executed by executors, we can't
access previously cached
- // readers, and therefore have to always open new ones
- Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader>
readers =
- openReaders(partitionName, fileSlice);
- try {
- List<Long> timings = new ArrayList<>();
-
- HoodieSeekingFileReader<?> baseFileReader = readers.getKey();
- HoodieMetadataLogRecordReader logRecordScanner =
readers.getRight();
-
- if (baseFileReader == null && logRecordScanner == null) {
- // TODO: what do we do if both does not exist? should we
throw an exception and let caller do the fallback ?
- return Collections.emptyIterator();
- }
+ return getByKeyPrefixesWithFileGroupReader(fileSlice,
sortedKeyPrefixes, partitionName);
+ });
+ }
- boolean fullKeys = false;
+ private Iterator<HoodieRecord<HoodieMetadataPayload>>
getByKeyPrefixesWithFileGroupReader(FileSlice fileSlice,
+
List<String> sortedKeyPrefixes,
+
String partitionName) throws IOException {
+ Option<HoodieInstant> latestMetadataInstant =
+
metadataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant();
+ String latestMetadataInstantTime =
+
latestMetadataInstant.map(HoodieInstant::requestedTime).orElse(SOLO_COMMIT_TIMESTAMP);
+ Schema schema =
HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema());
+ // Only those log files which have a corresponding completed instant on
the dataset should be read
+ // This is because the metadata table is updated before the dataset
instants are committed.
+ Set<String> validInstantTimestamps = getValidInstantTimestamps();
+ InstantRange instantRange = InstantRange.builder()
+ .rangeType(InstantRange.RangeType.EXACT_MATCH)
+ .explicitInstants(validInstantTimestamps).build();
+ HoodieFileGroupReader<IndexedRecord> fileGroupReader = getFileGroupReader(
+ metadataMetaClient.getTableConfig(),
+ latestMetadataInstantTime,
+ fileSlice,
+ schema,
+ schema,
+ metadataMetaClient,
+ new TypedProperties(),
Review Comment:
The props are also used to configure the spillable map in the record buffer
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]