nsivabalan commented on code in PR #14261: URL: https://github.com/apache/hudi/pull/14261#discussion_r2551202180
########## hudi-common/src/main/java/org/apache/hudi/common/table/timeline/StoppableRecordConsumer.java: ########## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.table.timeline; + +import org.apache.avro.generic.GenericRecord; + +import java.util.function.BiConsumer; + +/** + * Marker interface for BiConsumers that support early termination during timeline loading. + * + * <p>When a consumer implements this interface, the timeline loader can optimize by: + * <ul> + * <li>Stopping file reading when {@link #shouldStop()} returns true</li> + * <li>Sorting files in reverse chronological order if {@link #needsReverseOrder()} returns true</li> + * </ul> + * + * <p>This is particularly useful for limit-based queries where we only need the N most recent instants. + * The loader will check {@link #shouldStop()} between files and stop reading once the limit is reached, + * avoiding unnecessary I/O and decoding of records. + */ +public interface StoppableRecordConsumer extends BiConsumer<String, GenericRecord> { Review Comment: LimitingRecordConsumer or BoundedRecordConsumer ########## hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ArchivedTimelineV1.java: ########## @@ -218,6 +219,34 @@ record -> { }); } + @Override + public void loadCompactionDetailsInMemory(int limit) { + loadInstantsWithLimit(limit, true, + record -> { + Object action = record.get(ACTION_STATE); + return record.get(ACTION_TYPE_KEY).toString().equals(HoodieTimeline.COMPACTION_ACTION) + && (action == null || org.apache.hudi.common.table.timeline.HoodieInstant.State.INFLIGHT.toString().equals(action.toString())); Review Comment: is the action expected to be null for a completed entry? ########## hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v2/ArchivedTimelineLoaderV2.java: ########## @@ -56,29 +59,58 @@ public void loadInstants(HoodieTableMetaClient metaClient, // List all files List<String> fileNames = LSMTimeline.latestSnapshotManifest(metaClient, metaClient.getArchivePath()).getFileNames(); + // Check if consumer supports early termination + StoppableRecordConsumer stoppable = recordConsumer instanceof StoppableRecordConsumer + ? (StoppableRecordConsumer) recordConsumer + : null; + + // Filter files by time range + List<String> filteredFiles = new ArrayList<>(); + for (String fileName : fileNames) { + if (filter == null || LSMTimeline.isFileInRange(filter, fileName)) { + filteredFiles.add(fileName); + } + } + + // Sort files in reverse chronological order if needed (newest first for limit queries) + if (stoppable != null && stoppable.needsReverseOrder()) { + filteredFiles.sort(Comparator.comparing((String fileName) -> { + try { + return LSMTimeline.getMaxInstantTime(fileName); + } catch (Exception e) { + return ""; + } + }).reversed()); + } + Schema readSchema = LSMTimeline.getReadSchema(loadMode); - fileNames.stream() - .filter(fileName -> filter == null || LSMTimeline.isFileInRange(filter, fileName)) - .parallel().forEach(fileName -> { - // Read the archived file - try (HoodieAvroFileReader reader = (HoodieAvroFileReader) HoodieIOFactory.getIOFactory(metaClient.getStorage()) - .getReaderFactory(HoodieRecord.HoodieRecordType.AVRO) - .getFileReader(DEFAULT_HUDI_CONFIG_FOR_READER, new StoragePath(metaClient.getArchivePath(), fileName))) { - try (ClosableIterator<IndexedRecord> iterator = reader.getIndexedRecordIterator(HoodieLSMTimelineInstant.getClassSchema(), readSchema)) { - while (iterator.hasNext()) { - GenericRecord record = (GenericRecord) iterator.next(); - String instantTime = record.get(INSTANT_TIME_ARCHIVED_META_FIELD).toString(); - if ((filter == null || filter.isInRange(instantTime)) - && commitsFilter.apply(record)) { - recordConsumer.accept(instantTime, record); - } - } + filteredFiles.parallelStream().forEach(fileName -> { + if (stoppable != null && stoppable.shouldStop()) { Review Comment: if we are doing parallel stream, we can't guarantee latest N with limit N right? there could be non-continuous entries ########## hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ArchivedTimelineV1.java: ########## @@ -249,6 +278,19 @@ private List<HoodieInstant> loadInstants(HoodieArchivedTimeline.TimeRangeFilter .stream().flatMap(Collection::stream).sorted().collect(Collectors.toList()); } + private void loadInstantsWithLimit(int limit, boolean loadInstantDetails, Function<GenericRecord, Boolean> commitsFilter) { + InstantsLoaderWithLimit loader = new InstantsLoaderWithLimit(loadInstantDetails, limit); + timelineLoader.loadInstants( + metaClient, null, Option.empty(), LoadMode.PLAN, commitsFilter, loader); + List<HoodieInstant> collectedInstants = loader.getCollectedInstants(); + List<HoodieInstant> newInstants = collectedInstants.stream() + .filter(instant -> !getInstants().contains(instant)) Review Comment: can we do `getInstants()` once outside the loop and use it here. currently, this will result in N array lists ########## hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java: ########## @@ -29,10 +29,16 @@ public interface HoodieArchivedTimeline extends HoodieTimeline { void loadCompletedInstantDetailsInMemory(); + void loadCompletedInstantDetailsInMemory(String startTs, String endTs); Review Comment: do we have UTs for these? ########## hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v2/ArchivedTimelineLoaderV2.java: ########## @@ -56,29 +59,58 @@ public void loadInstants(HoodieTableMetaClient metaClient, // List all files List<String> fileNames = LSMTimeline.latestSnapshotManifest(metaClient, metaClient.getArchivePath()).getFileNames(); + // Check if consumer supports early termination + StoppableRecordConsumer stoppable = recordConsumer instanceof StoppableRecordConsumer Review Comment: "stoppable" is not a good naming. lets try to find a diff name ########## hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ArchivedTimelineV1.java: ########## @@ -274,6 +316,54 @@ public Map<String, List<HoodieInstant>> getInstantsInRangeCollected() { } } + public class InstantsLoaderWithLimit implements StoppableRecordConsumer { + private final Map<String, List<HoodieInstant>> instantsInRange = new ConcurrentHashMap<>(); + private final boolean loadInstantDetails; + private final int limit; + private volatile int loadedCount = 0; + + private InstantsLoaderWithLimit(boolean loadInstantDetails, int limit) { + this.loadInstantDetails = loadInstantDetails; + this.limit = limit; + } + + @Override + public boolean shouldStop() { + return loadedCount >= limit; + } + + @Override + public void accept(String instantTime, GenericRecord record) { + if (shouldStop()) { + return; + } + Option<HoodieInstant> instant = readCommit(instantTime, record, loadInstantDetails, null); + if (instant.isPresent()) { + synchronized (this) { + if (loadedCount < limit) { + instantsInRange.computeIfAbsent(instant.get().requestedTime(), s -> new ArrayList<>()) + .add(instant.get()); + loadedCount++; + } + } + } + } + + public Map<String, List<HoodieInstant>> getInstantsInRangeCollected() { + return instantsInRange; + } + + public List<HoodieInstant> getCollectedInstants() { + // V1 needs to flatten because the map values are lists, while V2 can use the values directly. + // V1 can have multiple instants with the same timestamp but different states (REQUESTED, INFLIGHT, COMPLETED). + return instantsInRange.values() + .stream() + .flatMap(Collection::stream) + .sorted() Review Comment: don't we need sorting based on instant times followed by states? i.e REQEUSTED followed by INFLIGHT followed by COMPLETED ########## hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ArchivedTimelineLoaderV1.java: ########## @@ -91,6 +96,10 @@ public void loadInstants(HoodieTableMetaClient metaClient, entryList.sort(new ArchiveFileVersionComparator()); for (StoragePathInfo fs : entryList) { + if (stoppable != null && stoppable.shouldStop()) { Review Comment: lets sync f2f on this. I have some questions/clarifications ########## hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ArchivedTimelineV1.java: ########## @@ -274,6 +316,54 @@ public Map<String, List<HoodieInstant>> getInstantsInRangeCollected() { } } + public class InstantsLoaderWithLimit implements StoppableRecordConsumer { + private final Map<String, List<HoodieInstant>> instantsInRange = new ConcurrentHashMap<>(); + private final boolean loadInstantDetails; + private final int limit; + private volatile int loadedCount = 0; + + private InstantsLoaderWithLimit(boolean loadInstantDetails, int limit) { + this.loadInstantDetails = loadInstantDetails; + this.limit = limit; + } + + @Override + public boolean shouldStop() { + return loadedCount >= limit; + } + + @Override + public void accept(String instantTime, GenericRecord record) { + if (shouldStop()) { + return; + } + Option<HoodieInstant> instant = readCommit(instantTime, record, loadInstantDetails, null); + if (instant.isPresent()) { + synchronized (this) { Review Comment: do we need synchronized here? do we expect this to be called from multiple threads? ########## hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v2/ArchivedTimelineLoaderV2.java: ########## @@ -56,29 +59,58 @@ public void loadInstants(HoodieTableMetaClient metaClient, // List all files List<String> fileNames = LSMTimeline.latestSnapshotManifest(metaClient, metaClient.getArchivePath()).getFileNames(); + // Check if consumer supports early termination + StoppableRecordConsumer stoppable = recordConsumer instanceof StoppableRecordConsumer + ? (StoppableRecordConsumer) recordConsumer + : null; + + // Filter files by time range + List<String> filteredFiles = new ArrayList<>(); + for (String fileName : fileNames) { + if (filter == null || LSMTimeline.isFileInRange(filter, fileName)) { + filteredFiles.add(fileName); + } + } + + // Sort files in reverse chronological order if needed (newest first for limit queries) + if (stoppable != null && stoppable.needsReverseOrder()) { + filteredFiles.sort(Comparator.comparing((String fileName) -> { + try { + return LSMTimeline.getMaxInstantTime(fileName); + } catch (Exception e) { + return ""; Review Comment: why return EMPTY_STRING? ########## hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ArchivedTimelineV1.java: ########## @@ -218,6 +219,34 @@ record -> { }); } + @Override + public void loadCompactionDetailsInMemory(int limit) { + loadInstantsWithLimit(limit, true, + record -> { + Object action = record.get(ACTION_STATE); + return record.get(ACTION_TYPE_KEY).toString().equals(HoodieTimeline.COMPACTION_ACTION) + && (action == null || org.apache.hudi.common.table.timeline.HoodieInstant.State.INFLIGHT.toString().equals(action.toString())); Review Comment: lets add a comment // Older files don't have action state set. ########## hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v2/ArchivedTimelineV2.java: ########## @@ -242,6 +262,66 @@ private List<HoodieInstant> loadInstants( return result; } + /** + * Loads instants with a limit on the number of instants to load. + * This is used for limit-based loading where we only want to load the N most recent instants. + */ + private void loadInstantsWithLimit(int limit, HoodieArchivedTimeline.LoadMode loadMode, + Function<GenericRecord, Boolean> commitsFilter) { + InstantsLoaderWithLimit loader = new InstantsLoaderWithLimit(limit, loadMode); + timelineLoader.loadInstants(metaClient, null, loadMode, commitsFilter, loader); + List<HoodieInstant> collectedInstants = loader.getCollectedInstants(); + List<HoodieInstant> newInstants = collectedInstants.stream() + .filter(instant -> !getInstants().contains(instant)) + .collect(Collectors.toList()); + if (!newInstants.isEmpty()) { + appendInstants(newInstants); + } + } + + /** + * Callback to read instant details with a limit on the number of instants to load. + * Implements StoppableRecordConsumer to enable early termination and file sorting optimization. + */ + private class InstantsLoaderWithLimit implements StoppableRecordConsumer { Review Comment: lets avoid code duplication. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
