danny0405 commented on code in PR #17770:
URL: https://github.com/apache/hudi/pull/17770#discussion_r2667043645
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v2/ArchivedTimelineLoaderV2.java:
##########
@@ -84,44 +87,43 @@ public void loadInstants(HoodieTableMetaClient metaClient,
// Sort files in reverse chronological order if limit is specified
(newest first for limit queries)
if (hasLimit) {
- filteredFiles.sort(Comparator.comparing((String fileName) -> {
- return LSMTimeline.getMaxInstantTime(fileName);
- }).reversed());
+
filteredFiles.sort(Comparator.comparing(LSMTimeline::getMaxInstantTime).reversed());
}
Schema readSchema = LSMTimeline.getReadSchema(loadMode);
// Use serial stream when limit is involved to guarantee order
- java.util.stream.Stream<String> fileStream = hasLimit
+ Stream<String> fileStream = hasLimit
? filteredFiles.stream()
: filteredFiles.parallelStream();
- fileStream.forEach(fileName -> {
+ return Option.fromJavaOptional(fileStream.map(fileName -> {
if (hasLimit && loadedCount.get() >= limit.get()) {
- return;
+ return null;
}
// Read the archived file
try (HoodieAvroFileReader reader = (HoodieAvroFileReader)
HoodieIOFactory.getIOFactory(metaClient.getStorage())
.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO)
.getFileReader(DEFAULT_HUDI_CONFIG_FOR_READER, new
StoragePath(metaClient.getArchivePath(), fileName))) {
//TODO boundary to revisit in later pr to use HoodieSchema directly
try (ClosableIterator<IndexedRecord> iterator =
reader.getIndexedRecordIterator(HoodieSchema.fromAvroSchema(HoodieLSMTimelineInstant.getClassSchema()),
- HoodieSchema.fromAvroSchema(readSchema))) {
+ HoodieSchema.fromAvroSchema(readSchema))) {
+ int instantTimeFieldPosition =
readSchema.getField(INSTANT_TIME_ARCHIVED_META_FIELD).pos();
while (iterator.hasNext() && (!hasLimit || loadedCount.get() <
limit.get())) {
GenericRecord record = (GenericRecord) iterator.next();
- String instantTime =
record.get(INSTANT_TIME_ARCHIVED_META_FIELD).toString();
- if ((filter == null || filter.isInRange(instantTime))
Review Comment:
yeah, the solution in this PR is actually in my mind for some time, +1 to
this, should we also remove the filter for v1 loader too?
Another way is we figure out the earliest instant time with the list of
archived files that are in the query time range in `CompletionTimeQueryViewV2`,
then we can avoid the changes in the loaders for 2 reasons:
1. the v1 loader has no file skipping, the read cost is always there;
2. the per record filtering still makes sense for query scenarios except the
`CompletionTimeQueryViewV2`, like the timeline loading.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]