the-other-tim-brown commented on code in PR #17770:
URL: https://github.com/apache/hudi/pull/17770#discussion_r2663546494


##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v2/ArchivedTimelineLoaderV2.java:
##########
@@ -84,44 +87,43 @@ public void loadInstants(HoodieTableMetaClient metaClient,
 
       // Sort files in reverse chronological order if limit is specified 
(newest first for limit queries)
       if (hasLimit) {
-        filteredFiles.sort(Comparator.comparing((String fileName) -> {
-          return LSMTimeline.getMaxInstantTime(fileName);
-        }).reversed());
+        
filteredFiles.sort(Comparator.comparing(LSMTimeline::getMaxInstantTime).reversed());
       }
 
       Schema readSchema = LSMTimeline.getReadSchema(loadMode);
       // Use serial stream when limit is involved to guarantee order
-      java.util.stream.Stream<String> fileStream = hasLimit
+      Stream<String> fileStream = hasLimit
           ? filteredFiles.stream()
           : filteredFiles.parallelStream();
-      fileStream.forEach(fileName -> {
+      return fileStream.map(fileName -> {
         if (hasLimit && loadedCount.get() >= limit.get()) {
-          return;
+          return null;
         }
         // Read the archived file
         try (HoodieAvroFileReader reader = (HoodieAvroFileReader) 
HoodieIOFactory.getIOFactory(metaClient.getStorage())
             .getReaderFactory(HoodieRecord.HoodieRecordType.AVRO)
             .getFileReader(DEFAULT_HUDI_CONFIG_FOR_READER, new 
StoragePath(metaClient.getArchivePath(), fileName))) {
           //TODO boundary to revisit in later pr to use HoodieSchema directly
           try (ClosableIterator<IndexedRecord> iterator = 
reader.getIndexedRecordIterator(HoodieSchema.fromAvroSchema(HoodieLSMTimelineInstant.getClassSchema()),
-                  HoodieSchema.fromAvroSchema(readSchema))) {            
+                  HoodieSchema.fromAvroSchema(readSchema))) {
+            int instantTimeFieldPosition = 
readSchema.getField(INSTANT_TIME_ARCHIVED_META_FIELD).pos();
             while (iterator.hasNext() && (!hasLimit || loadedCount.get() < 
limit.get())) {
               GenericRecord record = (GenericRecord) iterator.next();
-              String instantTime = 
record.get(INSTANT_TIME_ARCHIVED_META_FIELD).toString();
-              if ((filter == null || filter.isInRange(instantTime))
-                  && commitsFilter.apply(record)) {
+              String instantTime = 
record.get(instantTimeFieldPosition).toString();
+              if (commitsFilter.apply(record)) {
                 recordConsumer.accept(instantTime, record);
                 if (hasLimit) {
                   loadedCount.incrementAndGet();
                 }
               }
             }
           }
+          return LSMTimeline.getMinInstantTime(fileName);
         } catch (IOException ioException) {
           throw new HoodieIOException("Error open file reader for path: "
               + new StoragePath(metaClient.getArchivePath(), fileName));
         }
-      });
+      
}).filter(Objects::nonNull).reduce(InstantComparison::minTimestamp).orElse(null);

Review Comment:
   I've updated the interface to return an option instead so it is more clear 
to the caller



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to