nsivabalan commented on code in PR #14261:
URL: https://github.com/apache/hudi/pull/14261#discussion_r2579119997


##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v2/ArchivedTimelineV2.java:
##########
@@ -236,12 +272,27 @@ private List<HoodieInstant> loadInstants(
     Map<String, HoodieInstant> instantsInRange = new ConcurrentHashMap<>();
     Option<BiConsumer<String, GenericRecord>> instantDetailsConsumer = 
Option.ofNullable(getInstantDetailsFunc(loadMode));
     timelineLoader.loadInstants(metaClient, filter, loadMode, commitsFilter,
-        (instantTime, avroRecord) -> instantsInRange.putIfAbsent(instantTime, 
readCommit(instantTime, avroRecord, instantDetailsConsumer)));
+        (instantTime, avroRecord) -> instantsInRange.putIfAbsent(instantTime, 
readCommit(instantTime, avroRecord, instantDetailsConsumer)), -1);
     List<HoodieInstant> result = new ArrayList<>(instantsInRange.values());
     Collections.sort(result);
     return result;
   }
 
+  /**
+   * Loads instants with a limit on the number of instants to load.
+   * This is used for limit-based loading where we only want to load the N 
most recent instants.
+   */
+  private void loadInstantsWithLimit(int limit, 
HoodieArchivedTimeline.LoadMode loadMode,
+      Function<GenericRecord, Boolean> commitsFilter) {
+    Map<String, HoodieInstant> instantsInRange = new ConcurrentHashMap<>();
+    Option<BiConsumer<String, GenericRecord>> instantDetailsConsumer = 
Option.ofNullable(getInstantDetailsFunc(loadMode));
+    timelineLoader.loadInstants(metaClient, null, loadMode, commitsFilter,
+        (instantTime, avroRecord) -> instantsInRange.putIfAbsent(instantTime, 
readCommit(instantTime, avroRecord, instantDetailsConsumer)), limit);
+    List<HoodieInstant> collectedInstants = new 
ArrayList<>(instantsInRange.values());
+    Collections.sort(collectedInstants);
+    appendLoadedInstants(collectedInstants);

Review Comment:
   this is differing from what we saw in v1 archived timeline. 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/BaseHoodieTimeline.java:
##########
@@ -114,6 +114,22 @@ protected void appendInstants(List<HoodieInstant> 
newInstants) {
     clearState();
   }
 
+  /**
+   * Helper method to append loaded instants to the timeline, filtering out 
duplicates.
+   * This is used by both time-range and limit-based loading to avoid code 
duplication.
+   *
+   * @param loadedInstants The list of instants that were loaded to readCommit 
field of timeline
+   */
+  protected void appendLoadedInstants(List<HoodieInstant> loadedInstants) {

Review Comment:
   we can name this `appendInstants` 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/ArchivedTimelineLoader.java:
##########
@@ -45,4 +45,25 @@ void loadInstants(
       HoodieArchivedTimeline.LoadMode loadMode,
       Function<GenericRecord, Boolean> commitsFilter,
       BiConsumer<String, GenericRecord> recordConsumer);
+
+  /**
+   * Loads the instants from the timeline with optional limit for early 
termination.
+   *
+   * @param metaClient     The meta client.
+   * @param filter         The time range filter where the target instant 
belongs to.
+   * @param loadMode       The load mode.
+   * @param commitsFilter  Filter of the instant type.
+   * @param recordConsumer Consumer of the instant record payload.
+   * @param limit          Maximum number of instants to load. Use -1 for no 
limit.

Review Comment:
   if we say "optional" limit, then, the arg should be `Option<Integer>` 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ArchivedTimelineLoaderV1.java:
##########
@@ -100,6 +119,9 @@ public void loadInstants(HoodieTableMetaClient metaClient,
           int instantsInPreviousFile = instantsInRange.size();
           // Read the avro blocks
           while (reader.hasNext()) {
+            if (hasLimit && loadedCount.get() >= limit) {

Review Comment:
   we can add the condition to within `while` condition itself



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ArchivedTimelineV1.java:
##########
@@ -209,13 +222,44 @@ public void loadCompactionDetailsInMemory(String 
compactionInstantTime) {
   @Override
   public void loadCompactionDetailsInMemory(String startTs, String endTs) {
     // load compactionPlan
-    loadInstants(new ClosedClosedTimeRangeFilter(startTs, endTs), null, true,
+    List<HoodieInstant> loadedInstants = loadInstants(new 
ClosedClosedTimeRangeFilter(startTs, endTs), null, true,
         record -> {
           // Older files don't have action state set.
           Object action = record.get(ACTION_STATE);
           return 
record.get(ACTION_TYPE_KEY).toString().equals(HoodieTimeline.COMPACTION_ACTION)
               && (action == null || 
org.apache.hudi.common.table.timeline.HoodieInstant.State.INFLIGHT.toString().equals(action.toString()));
         });
+    appendLoadedInstants(loadedInstants);
+  }
+
+  @Override
+  public void loadCompactionDetailsInMemory(int limit) {
+    loadInstantsWithLimit(limit, true,
+        record -> {
+          Object actionState = record.get(ACTION_STATE);
+          // Older files & archivedTimelineV2 don't have action state set.
+          return 
record.get(ACTION_TYPE_KEY).toString().equals(HoodieTimeline.COMPACTION_ACTION)
+              && (actionState == null || 
org.apache.hudi.common.table.timeline.HoodieInstant.State.INFLIGHT.toString().equals(actionState.toString()));
+        });

Review Comment:
   So, for limit based apis, its streaming. while for "start and end time" 
based apis, its cached response is it? 
   is it an intentional choice 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to