vinothchandar commented on code in PR #10255:
URL: https://github.com/apache/hudi/pull/10255#discussion_r1420410913


##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/InstantRange.java:
##########
@@ -22,9 +22,12 @@
 import org.apache.hudi.common.util.ValidationUtils;
 
 import java.io.Serializable;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.List;
 import java.util.Objects;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 /**
  * A instant commits range used for incremental reader filtering.

Review Comment:
   Rename this class to base instant range? Also should this class should be in 
a different package? Like "timeline"?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java:
##########
@@ -108,6 +108,9 @@ public static String instantTimePlusMillis(String 
timestamp, long milliseconds)
   public static String instantTimeMinusMillis(String timestamp, long 
milliseconds) {
     try {
       String timestampInMillis = fixInstantTimeCompatibility(timestamp);
+      if (timestampInMillis.length() < MILLIS_INSTANT_TIMESTAMP_FORMAT_LENGTH) 
{

Review Comment:
   Can you help me understand whats going on here.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/IncrementalQueryAnalyzer.java:
##########
@@ -0,0 +1,425 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.read;
+
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.log.InstantRange;
+import org.apache.hudi.common.table.timeline.CompletionTimeQueryView;
+import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.VisibleForTesting;
+import org.apache.hudi.common.util.collection.Pair;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+/**
+ * Analyzer for incremental queries.
+ *
+ * <p>The analyzer can supply info about the incremental queries including:
+ * <ul>
+ *   <li>The archived instant candidates;</li>
+ *   <li>The active instant candidates;</li>
+ *   <li>The instant filtering predicate, e.g the instant range;</li>
+ *   <li>Whether the query starts from the earliest;</li>
+ *   <li>Whether the query ends to the latest;</li>
+ *   <li>The max completion time used for fs view file slice version 
filtering.</li>
+ * </ul>
+ *
+ * <p><h2>Criteria for different query ranges:</h2>
+ *
+ * <table>
+ *   <tr>
+ *     <th>Query Range</th>
+ *     <th>File Handles Decoding</th>
+ *     <th>Instant Filtering Predicate</th>
+ *   </tr>
+ *   <tr>
+ *     <td>[earliest, _]</td>
+ *     <td>The latest snapshot files from table metadata</td>
+ *     <td>_</td>
+ *   </tr>
+ *   <tr>
+ *     <td>[earliest, endTime]</td>
+ *     <td>The latest snapshot files from table metadata</td>
+ *     <td>'_hoodie_commit_time' in setA, setA is a collection of all the 
instants completed before or on 'endTime'</td>
+ *   </tr>
+ *   <tr>
+ *     <td>[_, _]</td>

Review Comment:
   should we even allow this? seems like invalid input to Incremental query. 
Can we simplify to just . 
   ````
   [startTime, endTime]
   [startTime, _] (we assume _ is latest time)
   [_, endTime] (we assume _ is earliest time)
   ```
   
   



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/CompletionTimeQueryView.java:
##########
@@ -175,42 +190,109 @@ public Option<String> getCompletionTime(String 
startTime) {
    *
    * <p>By default, assumes there is at most 1 day time of duration for an 
instant to accelerate the queries.
    *
-   * @param startCompletionTime The start completion time.
-   * @param endCompletionTime   The end completion time.
+   * @param readTimeline The read timeline.
+   * @param startTime    The start completion time.
+   * @param endTime      The end completion time.
+   * @param rangeType    The range type.
    *
-   * @return The instant time set.
+   * @return The sorted instant time list.
    */
-  public Set<String> getStartTimeSet(String startCompletionTime, String 
endCompletionTime) {
+  public List<String> getStartTime(
+      HoodieTimeline readTimeline,
+      @Nullable String startTime,
+      @Nullable String endTime,
+      InstantRange.RangeType rangeType) {
     // assumes any instant/transaction lasts at most 1 day to optimize the 
query efficiency.
-    return getStartTimeSet(startCompletionTime, endCompletionTime, s -> 
HoodieInstantTimeGenerator.instantTimeMinusMillis(s, MILLI_SECONDS_IN_ONE_DAY));
+    return getStartTime(readTimeline, startTime, endTime, rangeType, s -> 
HoodieInstantTimeGenerator.instantTimeMinusMillis(s, MILLI_SECONDS_IN_ONE_DAY));
+  }
+
+  /**
+   * Queries the instant start time with given completion time range.
+   *
+   * @param startTime               The start completion time.
+   * @param endTime                 The end completion time.
+   * @param earliestInstantTimeFunc The function to generate the earliest 
start time boundary
+   *                                with the minimum completion time.
+   *
+   * @return The sorted instant time list.
+   */
+  @VisibleForTesting
+  public List<String> getStartTime(
+      @Nullable String startTime,
+      @Nullable String endTime,
+      Function<String, String> earliestInstantTimeFunc) {
+    return 
getStartTime(metaClient.getCommitsTimeline().filterCompletedInstants(), 
startTime, endTime, InstantRange.RangeType.CLOSE_CLOSE, 
earliestInstantTimeFunc);
   }
 
   /**
    * Queries the instant start time with given completion time range.
    *
-   * @param startCompletionTime   The start completion time.
-   * @param endCompletionTime     The end completion time.
-   * @param earliestStartTimeFunc The function to generate the earliest start 
time boundary
-   *                              with the minimum completion time {@code 
startCompletionTime}.
+   * @param readTimeline            The read timeline.
+   * @param startTime               The start completion time.
+   * @param endTime                 The end completion time.
+   * @param rangeType               The range type.
+   * @param earliestInstantTimeFunc The function to generate the earliest 
start time boundary
+   *                                with the minimum completion time.
    *
-   * @return The instant time set.
+   * @return The sorted instant time list.
    */
-  public Set<String> getStartTimeSet(String startCompletionTime, String 
endCompletionTime, Function<String, String> earliestStartTimeFunc) {
-    String startInstant = earliestStartTimeFunc.apply(startCompletionTime);
+  public List<String> getStartTime(
+      HoodieTimeline readTimeline,
+      @Nullable String startTime,
+      @Nullable String endTime,
+      InstantRange.RangeType rangeType,
+      Function<String, String> earliestInstantTimeFunc) {
+    final boolean startFromEarliest = 
START_COMMIT_EARLIEST.equalsIgnoreCase(startTime);
+    String earliestInstantToLoad = null;
+    if (startTime != null && !startFromEarliest) {
+      earliestInstantToLoad = earliestInstantTimeFunc.apply(startTime);
+    } else if (endTime != null) {
+      earliestInstantToLoad = earliestInstantTimeFunc.apply(endTime);
+    }
+
+    // ensure the earliest instant boundary be loaded.
+    if (earliestInstantToLoad != null && 
HoodieTimeline.compareTimestamps(this.cursorInstant, GREATER_THAN, 
earliestInstantToLoad)) {
+      loadCompletionTimeIncrementally(earliestInstantToLoad);
+    }
+
+    if (startTime == null && endTime != null) {
+      // returns the last instant that finished at or before the given 
completion time 'endTime'.
+      String maxInstantTime = readTimeline.getInstantsAsStream()
+          .filter(instant -> instant.isCompleted() && 
HoodieTimeline.compareTimestamps(instant.getCompletionTime(), 
LESSER_THAN_OR_EQUALS, endTime))
+          
.max(Comparator.comparing(HoodieInstant::getCompletionTime)).map(HoodieInstant::getTimestamp).orElse(null);
+      if (maxInstantTime != null) {
+        return Collections.singletonList(maxInstantTime);
+      }
+      // fallback to archived timeline
+      return this.startToCompletionInstantTimeMap.entrySet().stream()
+          .filter(entry -> HoodieTimeline.compareTimestamps(entry.getValue(), 
LESSER_THAN_OR_EQUALS, endTime))
+          .map(Map.Entry::getKey).collect(Collectors.toList());
+    }
+
+    if (startFromEarliest) {
+      // expedience for snapshot read: ['earliest', _) to avoid loading 
unnecessary instants.
+      startTime = null;
+    }
+
+    if (startTime == null && endTime == null) {
+      return readTimeline.filterCompletedInstants().lastInstant().map(instant 
-> 
Collections.singletonList(instant.getTimestamp())).orElse(Collections.emptyList());

Review Comment:
   this is basically reading the latest snapshot? add a comment?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/CompletionTimeQueryView.java:
##########
@@ -175,42 +190,109 @@ public Option<String> getCompletionTime(String 
startTime) {
    *
    * <p>By default, assumes there is at most 1 day time of duration for an 
instant to accelerate the queries.
    *
-   * @param startCompletionTime The start completion time.
-   * @param endCompletionTime   The end completion time.
+   * @param readTimeline The read timeline.
+   * @param startTime    The start completion time.
+   * @param endTime      The end completion time.
+   * @param rangeType    The range type.
    *
-   * @return The instant time set.
+   * @return The sorted instant time list.
    */
-  public Set<String> getStartTimeSet(String startCompletionTime, String 
endCompletionTime) {
+  public List<String> getStartTime(
+      HoodieTimeline readTimeline,
+      @Nullable String startTime,
+      @Nullable String endTime,
+      InstantRange.RangeType rangeType) {
     // assumes any instant/transaction lasts at most 1 day to optimize the 
query efficiency.
-    return getStartTimeSet(startCompletionTime, endCompletionTime, s -> 
HoodieInstantTimeGenerator.instantTimeMinusMillis(s, MILLI_SECONDS_IN_ONE_DAY));
+    return getStartTime(readTimeline, startTime, endTime, rangeType, s -> 
HoodieInstantTimeGenerator.instantTimeMinusMillis(s, MILLI_SECONDS_IN_ONE_DAY));
+  }
+
+  /**
+   * Queries the instant start time with given completion time range.
+   *
+   * @param startTime               The start completion time.
+   * @param endTime                 The end completion time.
+   * @param earliestInstantTimeFunc The function to generate the earliest 
start time boundary
+   *                                with the minimum completion time.
+   *
+   * @return The sorted instant time list.
+   */
+  @VisibleForTesting
+  public List<String> getStartTime(

Review Comment:
   should we standardize to one things - begin instant time vs start time. I 
prefer "begin". What does the rest of the code use?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/IncrementalQueryAnalyzer.java:
##########
@@ -0,0 +1,425 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.read;
+
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.log.InstantRange;
+import org.apache.hudi.common.table.timeline.CompletionTimeQueryView;
+import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.VisibleForTesting;
+import org.apache.hudi.common.util.collection.Pair;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+/**
+ * Analyzer for incremental queries.
+ *
+ * <p>The analyzer can supply info about the incremental queries including:
+ * <ul>
+ *   <li>The archived instant candidates;</li>
+ *   <li>The active instant candidates;</li>
+ *   <li>The instant filtering predicate, e.g the instant range;</li>
+ *   <li>Whether the query starts from the earliest;</li>
+ *   <li>Whether the query ends to the latest;</li>
+ *   <li>The max completion time used for fs view file slice version 
filtering.</li>
+ * </ul>
+ *
+ * <p><h2>Criteria for different query ranges:</h2>
+ *
+ * <table>
+ *   <tr>
+ *     <th>Query Range</th>
+ *     <th>File Handles Decoding</th>
+ *     <th>Instant Filtering Predicate</th>
+ *   </tr>
+ *   <tr>
+ *     <td>[earliest, _]</td>
+ *     <td>The latest snapshot files from table metadata</td>
+ *     <td>_</td>
+ *   </tr>
+ *   <tr>
+ *     <td>[earliest, endTime]</td>
+ *     <td>The latest snapshot files from table metadata</td>
+ *     <td>'_hoodie_commit_time' in setA, setA is a collection of all the 
instants completed before or on 'endTime'</td>
+ *   </tr>
+ *   <tr>
+ *     <td>[_, _]</td>
+ *     <td>The latest completed instant metadata</td>
+ *     <td>'_hoodie_commit_time' = i_n, i_n is the latest completed 
instant</td>
+ *   </tr>
+ *   <tr>
+ *     <td>[_, endTime]</td>
+ *     <td>i).find the last completed instant i_n before or on 'endTim;
+ *     ii). read the latest snapshot from table metadata if i_n is archived or 
the commit metadata if it is still active</td>
+ *     <td>'_hoodie_commit_time' = i_n</td>
+ *   </tr>
+ *   <tr>
+ *     <td>[startTime, _]</td>
+ *     <td>i).find the instant set setA, setA is a collection of all the 
instants completed before or on 'endTime';
+ *     ii). read the latest snapshot from table metadata if setA has archived 
instants or the commit metadata if all the instants are still active</td>
+ *     <td>'_hoodie_commit_time' in setA</td>
+ *   </tr>
+ *   <tr>
+ *     <td>[earliest, endTime]</td>
+ *     <td>i).find the instant set setA, setA is a collection of all the 
instants completed in the given time range;
+ *     ii). read the latest snapshot from table metadata if setA has archived 
instants or the commit metadata if all the instants are still active</td>
+ *     <td>'_hoodie_commit_time' in setA</td>
+ *   </tr>
+ * </table>
+ *
+ * <p> A range type is required for analyzing the query so that the query 
range boundary inclusiveness have clear semantics.
+ *
+ * <p>IMPORTANT: the reader may optionally choose to fall back to reading the 
latest snapshot if there are files missing from decoding the commit metadata.
+ */
+public class IncrementalQueryAnalyzer {
+  public static final String START_COMMIT_EARLIEST = "earliest";
+
+  private final HoodieTableMetaClient metaClient;
+  private final String startTime;
+  private final String endTime;
+  private final InstantRange.RangeType rangeType;
+  private final boolean skipCompaction;
+  private final boolean skipClustering;
+  private final int limit;
+
+  private IncrementalQueryAnalyzer(
+      HoodieTableMetaClient metaClient,
+      String startTime,
+      String endTime,
+      InstantRange.RangeType rangeType,
+      boolean skipCompaction,
+      boolean skipClustering,
+      int limit) {
+    this.metaClient = metaClient;
+    this.startTime = startTime;
+    this.endTime = endTime;
+    this.rangeType = rangeType;
+    this.skipCompaction = skipCompaction;
+    this.skipClustering = skipClustering;
+    this.limit = limit;
+  }
+
+  /**
+   * Returns a builder.
+   */
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  /**
+   * Analyzes the incremental query context with given completion time range.
+   *
+   * @return An incremental query context including the instant time range 
info.
+   */
+  public QueryContext analyze() {
+    try (CompletionTimeQueryView completionTimeQueryView = new 
CompletionTimeQueryView(this.metaClient)) {
+      if (completionTimeQueryView.isEmptyTable()) {
+        // no dataset committed in the table
+        return QueryContext.EMPTY;
+      }
+      HoodieTimeline readTimeline = getReadTimeline(this.metaClient);
+      List<String> instantTimeList = 
completionTimeQueryView.getStartTime(readTimeline, startTime, endTime, 
rangeType);
+      if (instantTimeList.isEmpty()) {
+        // no instants completed within the give time range, returns early.
+        return QueryContext.EMPTY;
+      }
+      // get hoodie instants
+      Pair<List<String>, List<String>> splitInstantTime = 
splitInstantByActiveness(instantTimeList, completionTimeQueryView);
+      Set<String> instantTimeSet = new HashSet<>(instantTimeList);
+      List<String> archivedInstantTime = splitInstantTime.getKey();
+      List<String> activeInstantTime = splitInstantTime.getValue();
+      List<HoodieInstant> archivedInstants = new ArrayList<>();
+      List<HoodieInstant> activeInstants = new ArrayList<>();
+      HoodieTimeline archivedReadTimeline = null;
+      if (!activeInstantTime.isEmpty()) {
+        activeInstants = readTimeline.getInstantsAsStream().filter(instant -> 
instantTimeSet.contains(instant.getTimestamp())).collect(Collectors.toList());
+        if (limit > 0 && limit < activeInstants.size()) {
+          // streaming read speed limit, limits the maximum number of commits 
allowed to read for each run
+          activeInstants = activeInstants.subList(0, limit);
+        }
+      }
+      if (!archivedInstantTime.isEmpty()) {
+        archivedReadTimeline = getArchivedReadTimeline(metaClient, 
archivedInstantTime.get(0));
+        archivedInstants = 
archivedReadTimeline.getInstantsAsStream().filter(instant -> 
instantTimeSet.contains(instant.getTimestamp())).collect(Collectors.toList());
+      }
+      List<String> instants = Stream.concat(archivedInstants.stream(), 
activeInstants.stream()).map(HoodieInstant::getTimestamp).collect(Collectors.toList());
+      if (instants.isEmpty()) {
+        // no instants completed within the give time range, returns early.
+        return QueryContext.EMPTY;
+      }
+      if (startTime == null && endTime != null) {
+        instants = Collections.singletonList(instants.get(instants.size() - 
1));
+      }
+      String lastInstant = instants.get(instants.size() - 1);
+      // keep the same semantics with streaming read, default start from the 
latest commit
+      String startInstant = START_COMMIT_EARLIEST.equalsIgnoreCase(startTime) 
? null : startTime == null ? lastInstant : instants.get(0);
+      String endInstant = endTime == null ? null : lastInstant;
+      return QueryContext.create(startInstant, endInstant, instants, 
archivedInstants, activeInstants, readTimeline, archivedReadTimeline);
+    }
+  }
+
+  /**
+   * Splits the given instant time list into a pair of archived instant list 
and active instant list.
+   */
+  private static Pair<List<String>, List<String>> 
splitInstantByActiveness(List<String> instantTimeList, CompletionTimeQueryView 
completionTimeQueryView) {
+    int firstActiveIdx = IntStream.range(0, instantTimeList.size()).filter(i 
-> 
!completionTimeQueryView.isArchived(instantTimeList.get(i))).findFirst().orElse(-1);
+    if (firstActiveIdx == -1) {
+      return Pair.of(instantTimeList, Collections.emptyList());
+    } else if (firstActiveIdx == 0) {
+      return Pair.of(Collections.emptyList(), instantTimeList);
+    } else {
+      return Pair.of(instantTimeList.subList(0, firstActiveIdx), 
instantTimeList.subList(firstActiveIdx, instantTimeList.size()));
+    }
+  }
+
+  private HoodieTimeline getReadTimeline(HoodieTableMetaClient metaClient) {
+    HoodieTimeline timeline = 
metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants();
+    return filterInstantsAsPerUserConfigs(metaClient, timeline, 
this.skipCompaction, this.skipClustering);
+  }
+
+  private HoodieTimeline getArchivedReadTimeline(HoodieTableMetaClient 
metaClient, String startInstant) {
+    HoodieArchivedTimeline archivedTimeline = 
metaClient.getArchivedTimeline(startInstant, false);
+    HoodieTimeline archivedCompleteTimeline = 
archivedTimeline.getCommitsTimeline().filterCompletedInstants();
+    return filterInstantsAsPerUserConfigs(metaClient, 
archivedCompleteTimeline, this.skipCompaction, this.skipClustering);
+  }
+
+  /**
+   * Filters out the unnecessary instants as per user specified configs.
+   *
+   * @param timeline The timeline.
+   *
+   * @return the filtered timeline
+   */
+  @VisibleForTesting
+  public static HoodieTimeline 
filterInstantsAsPerUserConfigs(HoodieTableMetaClient metaClient, HoodieTimeline 
timeline, boolean skipCompaction, boolean skipClustering) {
+    final HoodieTimeline oriTimeline = timeline;
+    if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ & 
skipCompaction) {
+      // the compaction commit uses 'commit' as action which is tricky
+      timeline = timeline.filter(instant -> 
!instant.getAction().equals(HoodieTimeline.COMMIT_ACTION));
+    }
+    if (skipClustering) {
+      timeline = timeline.filter(instant -> 
!ClusteringUtils.isClusteringInstant(instant, oriTimeline));
+    }
+    return timeline;
+  }
+
+  // -------------------------------------------------------------------------
+  //  Inner Class
+  // -------------------------------------------------------------------------
+  /**
+   * Builder for {@link IncrementalQueryAnalyzer}.
+   */
+  public static class Builder {
+    /**
+     * Start completion time.
+     */
+    private String startTime;
+    /**
+     * End completion time.
+     */
+    private String endTime;
+    private InstantRange.RangeType rangeType;
+    private HoodieTableMetaClient metaClient;
+    private boolean skipCompaction = false;
+    private boolean skipClustering = false;
+    /**
+     * Maximum number of instants to read per run.
+     */
+    private int limit = -1;
+
+    public Builder() {
+    }
+
+    public Builder startTime(String startTime) {
+      this.startTime = startTime;
+      return this;
+    }
+
+    public Builder endTime(String endTime) {
+      this.endTime = endTime;
+      return this;
+    }
+
+    public Builder rangeType(InstantRange.RangeType rangeType) {
+      this.rangeType = rangeType;
+      return this;
+    }
+
+    public Builder metaClient(HoodieTableMetaClient metaClient) {
+      this.metaClient = metaClient;
+      return this;
+    }
+
+    public Builder skipCompaction(boolean skipCompaction) {

Review Comment:
   From a user standpoint, all of this should be transparent right.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/CompletionTimeQueryView.java:
##########
@@ -175,42 +190,109 @@ public Option<String> getCompletionTime(String 
startTime) {
    *
    * <p>By default, assumes there is at most 1 day time of duration for an 
instant to accelerate the queries.
    *
-   * @param startCompletionTime The start completion time.
-   * @param endCompletionTime   The end completion time.
+   * @param readTimeline The read timeline.
+   * @param startTime    The start completion time.
+   * @param endTime      The end completion time.
+   * @param rangeType    The range type.
    *
-   * @return The instant time set.
+   * @return The sorted instant time list.
    */
-  public Set<String> getStartTimeSet(String startCompletionTime, String 
endCompletionTime) {
+  public List<String> getStartTime(
+      HoodieTimeline readTimeline,
+      @Nullable String startTime,
+      @Nullable String endTime,
+      InstantRange.RangeType rangeType) {
     // assumes any instant/transaction lasts at most 1 day to optimize the 
query efficiency.
-    return getStartTimeSet(startCompletionTime, endCompletionTime, s -> 
HoodieInstantTimeGenerator.instantTimeMinusMillis(s, MILLI_SECONDS_IN_ONE_DAY));
+    return getStartTime(readTimeline, startTime, endTime, rangeType, s -> 
HoodieInstantTimeGenerator.instantTimeMinusMillis(s, MILLI_SECONDS_IN_ONE_DAY));
+  }
+
+  /**
+   * Queries the instant start time with given completion time range.
+   *
+   * @param startTime               The start completion time.
+   * @param endTime                 The end completion time.
+   * @param earliestInstantTimeFunc The function to generate the earliest 
start time boundary
+   *                                with the minimum completion time.
+   *
+   * @return The sorted instant time list.
+   */
+  @VisibleForTesting
+  public List<String> getStartTime(

Review Comment:
   rename: getStartTimes



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/CompletionTimeQueryView.java:
##########
@@ -175,42 +190,109 @@ public Option<String> getCompletionTime(String 
startTime) {
    *
    * <p>By default, assumes there is at most 1 day time of duration for an 
instant to accelerate the queries.
    *
-   * @param startCompletionTime The start completion time.
-   * @param endCompletionTime   The end completion time.
+   * @param readTimeline The read timeline.
+   * @param startTime    The start completion time.
+   * @param endTime      The end completion time.
+   * @param rangeType    The range type.
    *
-   * @return The instant time set.
+   * @return The sorted instant time list.
    */
-  public Set<String> getStartTimeSet(String startCompletionTime, String 
endCompletionTime) {
+  public List<String> getStartTime(
+      HoodieTimeline readTimeline,
+      @Nullable String startTime,
+      @Nullable String endTime,
+      InstantRange.RangeType rangeType) {
     // assumes any instant/transaction lasts at most 1 day to optimize the 
query efficiency.
-    return getStartTimeSet(startCompletionTime, endCompletionTime, s -> 
HoodieInstantTimeGenerator.instantTimeMinusMillis(s, MILLI_SECONDS_IN_ONE_DAY));
+    return getStartTime(readTimeline, startTime, endTime, rangeType, s -> 
HoodieInstantTimeGenerator.instantTimeMinusMillis(s, MILLI_SECONDS_IN_ONE_DAY));
+  }
+
+  /**
+   * Queries the instant start time with given completion time range.
+   *
+   * @param startTime               The start completion time.
+   * @param endTime                 The end completion time.
+   * @param earliestInstantTimeFunc The function to generate the earliest 
start time boundary
+   *                                with the minimum completion time.
+   *
+   * @return The sorted instant time list.
+   */
+  @VisibleForTesting
+  public List<String> getStartTime(
+      @Nullable String startTime,
+      @Nullable String endTime,
+      Function<String, String> earliestInstantTimeFunc) {
+    return 
getStartTime(metaClient.getCommitsTimeline().filterCompletedInstants(), 
startTime, endTime, InstantRange.RangeType.CLOSE_CLOSE, 
earliestInstantTimeFunc);
   }
 
   /**
    * Queries the instant start time with given completion time range.
    *
-   * @param startCompletionTime   The start completion time.
-   * @param endCompletionTime     The end completion time.
-   * @param earliestStartTimeFunc The function to generate the earliest start 
time boundary
-   *                              with the minimum completion time {@code 
startCompletionTime}.
+   * @param readTimeline            The read timeline.
+   * @param startTime               The start completion time.
+   * @param endTime                 The end completion time.
+   * @param rangeType               The range type.
+   * @param earliestInstantTimeFunc The function to generate the earliest 
start time boundary
+   *                                with the minimum completion time.
    *
-   * @return The instant time set.
+   * @return The sorted instant time list.
    */
-  public Set<String> getStartTimeSet(String startCompletionTime, String 
endCompletionTime, Function<String, String> earliestStartTimeFunc) {
-    String startInstant = earliestStartTimeFunc.apply(startCompletionTime);
+  public List<String> getStartTime(
+      HoodieTimeline readTimeline,
+      @Nullable String startTime,
+      @Nullable String endTime,
+      InstantRange.RangeType rangeType,
+      Function<String, String> earliestInstantTimeFunc) {
+    final boolean startFromEarliest = 
START_COMMIT_EARLIEST.equalsIgnoreCase(startTime);
+    String earliestInstantToLoad = null;
+    if (startTime != null && !startFromEarliest) {
+      earliestInstantToLoad = earliestInstantTimeFunc.apply(startTime);
+    } else if (endTime != null) {
+      earliestInstantToLoad = earliestInstantTimeFunc.apply(endTime);
+    }
+
+    // ensure the earliest instant boundary be loaded.
+    if (earliestInstantToLoad != null && 
HoodieTimeline.compareTimestamps(this.cursorInstant, GREATER_THAN, 
earliestInstantToLoad)) {
+      loadCompletionTimeIncrementally(earliestInstantToLoad);
+    }
+
+    if (startTime == null && endTime != null) {

Review Comment:
   lets please use Option and not `null` to do these.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/CompletionTimeQueryView.java:
##########
@@ -175,42 +190,109 @@ public Option<String> getCompletionTime(String 
startTime) {
    *
    * <p>By default, assumes there is at most 1 day time of duration for an 
instant to accelerate the queries.
    *
-   * @param startCompletionTime The start completion time.
-   * @param endCompletionTime   The end completion time.
+   * @param readTimeline The read timeline.
+   * @param startTime    The start completion time.
+   * @param endTime      The end completion time.
+   * @param rangeType    The range type.
    *
-   * @return The instant time set.
+   * @return The sorted instant time list.
    */
-  public Set<String> getStartTimeSet(String startCompletionTime, String 
endCompletionTime) {
+  public List<String> getStartTime(
+      HoodieTimeline readTimeline,
+      @Nullable String startTime,
+      @Nullable String endTime,
+      InstantRange.RangeType rangeType) {
     // assumes any instant/transaction lasts at most 1 day to optimize the 
query efficiency.
-    return getStartTimeSet(startCompletionTime, endCompletionTime, s -> 
HoodieInstantTimeGenerator.instantTimeMinusMillis(s, MILLI_SECONDS_IN_ONE_DAY));
+    return getStartTime(readTimeline, startTime, endTime, rangeType, s -> 
HoodieInstantTimeGenerator.instantTimeMinusMillis(s, MILLI_SECONDS_IN_ONE_DAY));
+  }
+
+  /**
+   * Queries the instant start time with given completion time range.
+   *
+   * @param startTime               The start completion time.
+   * @param endTime                 The end completion time.
+   * @param earliestInstantTimeFunc The function to generate the earliest 
start time boundary
+   *                                with the minimum completion time.
+   *
+   * @return The sorted instant time list.
+   */
+  @VisibleForTesting
+  public List<String> getStartTime(
+      @Nullable String startTime,
+      @Nullable String endTime,
+      Function<String, String> earliestInstantTimeFunc) {
+    return 
getStartTime(metaClient.getCommitsTimeline().filterCompletedInstants(), 
startTime, endTime, InstantRange.RangeType.CLOSE_CLOSE, 
earliestInstantTimeFunc);
   }
 
   /**
    * Queries the instant start time with given completion time range.
    *
-   * @param startCompletionTime   The start completion time.
-   * @param endCompletionTime     The end completion time.
-   * @param earliestStartTimeFunc The function to generate the earliest start 
time boundary
-   *                              with the minimum completion time {@code 
startCompletionTime}.
+   * @param readTimeline            The read timeline.
+   * @param startTime               The start completion time.
+   * @param endTime                 The end completion time.
+   * @param rangeType               The range type.
+   * @param earliestInstantTimeFunc The function to generate the earliest 
start time boundary
+   *                                with the minimum completion time.
    *
-   * @return The instant time set.
+   * @return The sorted instant time list.
    */
-  public Set<String> getStartTimeSet(String startCompletionTime, String 
endCompletionTime, Function<String, String> earliestStartTimeFunc) {
-    String startInstant = earliestStartTimeFunc.apply(startCompletionTime);
+  public List<String> getStartTime(
+      HoodieTimeline readTimeline,
+      @Nullable String startTime,
+      @Nullable String endTime,
+      InstantRange.RangeType rangeType,
+      Function<String, String> earliestInstantTimeFunc) {
+    final boolean startFromEarliest = 
START_COMMIT_EARLIEST.equalsIgnoreCase(startTime);
+    String earliestInstantToLoad = null;
+    if (startTime != null && !startFromEarliest) {
+      earliestInstantToLoad = earliestInstantTimeFunc.apply(startTime);
+    } else if (endTime != null) {
+      earliestInstantToLoad = earliestInstantTimeFunc.apply(endTime);
+    }
+
+    // ensure the earliest instant boundary be loaded.
+    if (earliestInstantToLoad != null && 
HoodieTimeline.compareTimestamps(this.cursorInstant, GREATER_THAN, 
earliestInstantToLoad)) {
+      loadCompletionTimeIncrementally(earliestInstantToLoad);
+    }
+
+    if (startTime == null && endTime != null) {
+      // returns the last instant that finished at or before the given 
completion time 'endTime'.
+      String maxInstantTime = readTimeline.getInstantsAsStream()
+          .filter(instant -> instant.isCompleted() && 
HoodieTimeline.compareTimestamps(instant.getCompletionTime(), 
LESSER_THAN_OR_EQUALS, endTime))
+          
.max(Comparator.comparing(HoodieInstant::getCompletionTime)).map(HoodieInstant::getTimestamp).orElse(null);
+      if (maxInstantTime != null) {
+        return Collections.singletonList(maxInstantTime);
+      }
+      // fallback to archived timeline
+      return this.startToCompletionInstantTimeMap.entrySet().stream()

Review Comment:
   why don't we return the max from this code path?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/IncrementalQueryAnalyzer.java:
##########
@@ -0,0 +1,425 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.read;
+
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.log.InstantRange;
+import org.apache.hudi.common.table.timeline.CompletionTimeQueryView;
+import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.VisibleForTesting;
+import org.apache.hudi.common.util.collection.Pair;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+/**
+ * Analyzer for incremental queries.
+ *
+ * <p>The analyzer can supply info about the incremental queries including:
+ * <ul>
+ *   <li>The archived instant candidates;</li>
+ *   <li>The active instant candidates;</li>
+ *   <li>The instant filtering predicate, e.g the instant range;</li>
+ *   <li>Whether the query starts from the earliest;</li>
+ *   <li>Whether the query ends to the latest;</li>
+ *   <li>The max completion time used for fs view file slice version 
filtering.</li>
+ * </ul>
+ *
+ * <p><h2>Criteria for different query ranges:</h2>
+ *
+ * <table>
+ *   <tr>
+ *     <th>Query Range</th>
+ *     <th>File Handles Decoding</th>
+ *     <th>Instant Filtering Predicate</th>
+ *   </tr>
+ *   <tr>
+ *     <td>[earliest, _]</td>
+ *     <td>The latest snapshot files from table metadata</td>
+ *     <td>_</td>
+ *   </tr>
+ *   <tr>
+ *     <td>[earliest, endTime]</td>
+ *     <td>The latest snapshot files from table metadata</td>
+ *     <td>'_hoodie_commit_time' in setA, setA is a collection of all the 
instants completed before or on 'endTime'</td>
+ *   </tr>
+ *   <tr>
+ *     <td>[_, _]</td>
+ *     <td>The latest completed instant metadata</td>
+ *     <td>'_hoodie_commit_time' = i_n, i_n is the latest completed 
instant</td>
+ *   </tr>
+ *   <tr>
+ *     <td>[_, endTime]</td>
+ *     <td>i).find the last completed instant i_n before or on 'endTim;
+ *     ii). read the latest snapshot from table metadata if i_n is archived or 
the commit metadata if it is still active</td>

Review Comment:
   I feel it'd be good to write code against a single global timeline 
abstraction instead of this archived/active divide.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/InstantRange.java:
##########
@@ -60,8 +63,8 @@ public String getEndInstant() {
   @Override
   public String toString() {
     return "InstantRange{"
-        + "startInstant='" + startInstant == null ? "null" : startInstant + 
'\''
-        + ", endInstant='" + endInstant == null ? "null" : endInstant + '\''
+        + "startInstant='" + (startInstant == null ? "null" : startInstant) + 
'\''
+        + ", endInstant='" + (endInstant == null ? "null" : endInstant) + '\''
         + ", rangeType='" + this.getClass().getSimpleName() + '\''

Review Comment:
   Could we use the range type enum name here? Cleaner than printing class 
name? '



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/CompletionTimeQueryView.java:
##########
@@ -175,42 +190,109 @@ public Option<String> getCompletionTime(String 
startTime) {
    *
    * <p>By default, assumes there is at most 1 day time of duration for an 
instant to accelerate the queries.
    *
-   * @param startCompletionTime The start completion time.
-   * @param endCompletionTime   The end completion time.
+   * @param readTimeline The read timeline.

Review Comment:
   whats special about the timeline being passed in?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/IncrementalQueryAnalyzer.java:
##########
@@ -0,0 +1,425 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.read;
+
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.log.InstantRange;
+import org.apache.hudi.common.table.timeline.CompletionTimeQueryView;
+import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.VisibleForTesting;
+import org.apache.hudi.common.util.collection.Pair;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+/**
+ * Analyzer for incremental queries.
+ *
+ * <p>The analyzer can supply info about the incremental queries including:
+ * <ul>
+ *   <li>The archived instant candidates;</li>
+ *   <li>The active instant candidates;</li>
+ *   <li>The instant filtering predicate, e.g the instant range;</li>
+ *   <li>Whether the query starts from the earliest;</li>
+ *   <li>Whether the query ends to the latest;</li>
+ *   <li>The max completion time used for fs view file slice version 
filtering.</li>
+ * </ul>
+ *
+ * <p><h2>Criteria for different query ranges:</h2>
+ *
+ * <table>
+ *   <tr>
+ *     <th>Query Range</th>
+ *     <th>File Handles Decoding</th>
+ *     <th>Instant Filtering Predicate</th>
+ *   </tr>
+ *   <tr>
+ *     <td>[earliest, _]</td>
+ *     <td>The latest snapshot files from table metadata</td>
+ *     <td>_</td>
+ *   </tr>
+ *   <tr>
+ *     <td>[earliest, endTime]</td>
+ *     <td>The latest snapshot files from table metadata</td>
+ *     <td>'_hoodie_commit_time' in setA, setA is a collection of all the 
instants completed before or on 'endTime'</td>
+ *   </tr>
+ *   <tr>
+ *     <td>[_, _]</td>
+ *     <td>The latest completed instant metadata</td>
+ *     <td>'_hoodie_commit_time' = i_n, i_n is the latest completed 
instant</td>
+ *   </tr>
+ *   <tr>
+ *     <td>[_, endTime]</td>
+ *     <td>i).find the last completed instant i_n before or on 'endTim;
+ *     ii). read the latest snapshot from table metadata if i_n is archived or 
the commit metadata if it is still active</td>
+ *     <td>'_hoodie_commit_time' = i_n</td>
+ *   </tr>
+ *   <tr>
+ *     <td>[startTime, _]</td>
+ *     <td>i).find the instant set setA, setA is a collection of all the 
instants completed before or on 'endTime';
+ *     ii). read the latest snapshot from table metadata if setA has archived 
instants or the commit metadata if all the instants are still active</td>
+ *     <td>'_hoodie_commit_time' in setA</td>
+ *   </tr>
+ *   <tr>
+ *     <td>[earliest, endTime]</td>
+ *     <td>i).find the instant set setA, setA is a collection of all the 
instants completed in the given time range;
+ *     ii). read the latest snapshot from table metadata if setA has archived 
instants or the commit metadata if all the instants are still active</td>
+ *     <td>'_hoodie_commit_time' in setA</td>
+ *   </tr>
+ * </table>
+ *
+ * <p> A range type is required for analyzing the query so that the query 
range boundary inclusiveness have clear semantics.
+ *
+ * <p>IMPORTANT: the reader may optionally choose to fall back to reading the 
latest snapshot if there are files missing from decoding the commit metadata.
+ */
+public class IncrementalQueryAnalyzer {
+  public static final String START_COMMIT_EARLIEST = "earliest";
+
+  private final HoodieTableMetaClient metaClient;
+  private final String startTime;
+  private final String endTime;
+  private final InstantRange.RangeType rangeType;
+  private final boolean skipCompaction;
+  private final boolean skipClustering;
+  private final int limit;
+
+  private IncrementalQueryAnalyzer(
+      HoodieTableMetaClient metaClient,
+      String startTime,
+      String endTime,
+      InstantRange.RangeType rangeType,
+      boolean skipCompaction,
+      boolean skipClustering,
+      int limit) {
+    this.metaClient = metaClient;
+    this.startTime = startTime;
+    this.endTime = endTime;
+    this.rangeType = rangeType;
+    this.skipCompaction = skipCompaction;
+    this.skipClustering = skipClustering;
+    this.limit = limit;
+  }
+
+  /**
+   * Returns a builder.
+   */
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  /**
+   * Analyzes the incremental query context with given completion time range.
+   *
+   * @return An incremental query context including the instant time range 
info.
+   */
+  public QueryContext analyze() {

Review Comment:
   yet to review this. 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/CompletionTimeQueryView.java:
##########
@@ -175,42 +190,109 @@ public Option<String> getCompletionTime(String 
startTime) {
    *
    * <p>By default, assumes there is at most 1 day time of duration for an 
instant to accelerate the queries.
    *
-   * @param startCompletionTime The start completion time.
-   * @param endCompletionTime   The end completion time.
+   * @param readTimeline The read timeline.
+   * @param startTime    The start completion time.
+   * @param endTime      The end completion time.
+   * @param rangeType    The range type.
    *
-   * @return The instant time set.
+   * @return The sorted instant time list.
    */
-  public Set<String> getStartTimeSet(String startCompletionTime, String 
endCompletionTime) {
+  public List<String> getStartTime(
+      HoodieTimeline readTimeline,
+      @Nullable String startTime,
+      @Nullable String endTime,
+      InstantRange.RangeType rangeType) {
     // assumes any instant/transaction lasts at most 1 day to optimize the 
query efficiency.
-    return getStartTimeSet(startCompletionTime, endCompletionTime, s -> 
HoodieInstantTimeGenerator.instantTimeMinusMillis(s, MILLI_SECONDS_IN_ONE_DAY));
+    return getStartTime(readTimeline, startTime, endTime, rangeType, s -> 
HoodieInstantTimeGenerator.instantTimeMinusMillis(s, MILLI_SECONDS_IN_ONE_DAY));
+  }
+
+  /**
+   * Queries the instant start time with given completion time range.
+   *
+   * @param startTime               The start completion time.
+   * @param endTime                 The end completion time.
+   * @param earliestInstantTimeFunc The function to generate the earliest 
start time boundary
+   *                                with the minimum completion time.
+   *
+   * @return The sorted instant time list.
+   */
+  @VisibleForTesting
+  public List<String> getStartTime(
+      @Nullable String startTime,
+      @Nullable String endTime,
+      Function<String, String> earliestInstantTimeFunc) {
+    return 
getStartTime(metaClient.getCommitsTimeline().filterCompletedInstants(), 
startTime, endTime, InstantRange.RangeType.CLOSE_CLOSE, 
earliestInstantTimeFunc);
   }
 
   /**
    * Queries the instant start time with given completion time range.
    *
-   * @param startCompletionTime   The start completion time.
-   * @param endCompletionTime     The end completion time.
-   * @param earliestStartTimeFunc The function to generate the earliest start 
time boundary
-   *                              with the minimum completion time {@code 
startCompletionTime}.
+   * @param readTimeline            The read timeline.
+   * @param startTime               The start completion time.
+   * @param endTime                 The end completion time.
+   * @param rangeType               The range type.
+   * @param earliestInstantTimeFunc The function to generate the earliest 
start time boundary
+   *                                with the minimum completion time.
    *
-   * @return The instant time set.
+   * @return The sorted instant time list.
    */
-  public Set<String> getStartTimeSet(String startCompletionTime, String 
endCompletionTime, Function<String, String> earliestStartTimeFunc) {
-    String startInstant = earliestStartTimeFunc.apply(startCompletionTime);
+  public List<String> getStartTime(
+      HoodieTimeline readTimeline,
+      @Nullable String startTime,
+      @Nullable String endTime,
+      InstantRange.RangeType rangeType,
+      Function<String, String> earliestInstantTimeFunc) {
+    final boolean startFromEarliest = 
START_COMMIT_EARLIEST.equalsIgnoreCase(startTime);
+    String earliestInstantToLoad = null;
+    if (startTime != null && !startFromEarliest) {
+      earliestInstantToLoad = earliestInstantTimeFunc.apply(startTime);
+    } else if (endTime != null) {
+      earliestInstantToLoad = earliestInstantTimeFunc.apply(endTime);
+    }
+
+    // ensure the earliest instant boundary be loaded.
+    if (earliestInstantToLoad != null && 
HoodieTimeline.compareTimestamps(this.cursorInstant, GREATER_THAN, 
earliestInstantToLoad)) {
+      loadCompletionTimeIncrementally(earliestInstantToLoad);
+    }
+
+    if (startTime == null && endTime != null) {
+      // returns the last instant that finished at or before the given 
completion time 'endTime'.
+      String maxInstantTime = readTimeline.getInstantsAsStream()
+          .filter(instant -> instant.isCompleted() && 
HoodieTimeline.compareTimestamps(instant.getCompletionTime(), 
LESSER_THAN_OR_EQUALS, endTime))
+          
.max(Comparator.comparing(HoodieInstant::getCompletionTime)).map(HoodieInstant::getTimestamp).orElse(null);
+      if (maxInstantTime != null) {
+        return Collections.singletonList(maxInstantTime);
+      }
+      // fallback to archived timeline
+      return this.startToCompletionInstantTimeMap.entrySet().stream()
+          .filter(entry -> HoodieTimeline.compareTimestamps(entry.getValue(), 
LESSER_THAN_OR_EQUALS, endTime))
+          .map(Map.Entry::getKey).collect(Collectors.toList());
+    }
+
+    if (startFromEarliest) {
+      // expedience for snapshot read: ['earliest', _) to avoid loading 
unnecessary instants.
+      startTime = null;

Review Comment:
   should this happen before. 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/IncrementalQueryAnalyzer.java:
##########
@@ -0,0 +1,425 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.read;
+
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.log.InstantRange;
+import org.apache.hudi.common.table.timeline.CompletionTimeQueryView;
+import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.VisibleForTesting;
+import org.apache.hudi.common.util.collection.Pair;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+/**
+ * Analyzer for incremental queries.
+ *
+ * <p>The analyzer can supply info about the incremental queries including:
+ * <ul>
+ *   <li>The archived instant candidates;</li>
+ *   <li>The active instant candidates;</li>
+ *   <li>The instant filtering predicate, e.g the instant range;</li>
+ *   <li>Whether the query starts from the earliest;</li>
+ *   <li>Whether the query ends to the latest;</li>
+ *   <li>The max completion time used for fs view file slice version 
filtering.</li>
+ * </ul>
+ *
+ * <p><h2>Criteria for different query ranges:</h2>
+ *
+ * <table>
+ *   <tr>
+ *     <th>Query Range</th>
+ *     <th>File Handles Decoding</th>
+ *     <th>Instant Filtering Predicate</th>
+ *   </tr>
+ *   <tr>
+ *     <td>[earliest, _]</td>
+ *     <td>The latest snapshot files from table metadata</td>
+ *     <td>_</td>
+ *   </tr>
+ *   <tr>
+ *     <td>[earliest, endTime]</td>
+ *     <td>The latest snapshot files from table metadata</td>
+ *     <td>'_hoodie_commit_time' in setA, setA is a collection of all the 
instants completed before or on 'endTime'</td>
+ *   </tr>
+ *   <tr>
+ *     <td>[_, _]</td>
+ *     <td>The latest completed instant metadata</td>
+ *     <td>'_hoodie_commit_time' = i_n, i_n is the latest completed 
instant</td>
+ *   </tr>
+ *   <tr>
+ *     <td>[_, endTime]</td>
+ *     <td>i).find the last completed instant i_n before or on 'endTim;
+ *     ii). read the latest snapshot from table metadata if i_n is archived or 
the commit metadata if it is still active</td>
+ *     <td>'_hoodie_commit_time' = i_n</td>
+ *   </tr>
+ *   <tr>
+ *     <td>[startTime, _]</td>
+ *     <td>i).find the instant set setA, setA is a collection of all the 
instants completed before or on 'endTime';
+ *     ii). read the latest snapshot from table metadata if setA has archived 
instants or the commit metadata if all the instants are still active</td>
+ *     <td>'_hoodie_commit_time' in setA</td>
+ *   </tr>
+ *   <tr>
+ *     <td>[earliest, endTime]</td>
+ *     <td>i).find the instant set setA, setA is a collection of all the 
instants completed in the given time range;
+ *     ii). read the latest snapshot from table metadata if setA has archived 
instants or the commit metadata if all the instants are still active</td>
+ *     <td>'_hoodie_commit_time' in setA</td>
+ *   </tr>
+ * </table>
+ *
+ * <p> A range type is required for analyzing the query so that the query 
range boundary inclusiveness have clear semantics.
+ *
+ * <p>IMPORTANT: the reader may optionally choose to fall back to reading the 
latest snapshot if there are files missing from decoding the commit metadata.
+ */
+public class IncrementalQueryAnalyzer {
+  public static final String START_COMMIT_EARLIEST = "earliest";
+
+  private final HoodieTableMetaClient metaClient;
+  private final String startTime;
+  private final String endTime;
+  private final InstantRange.RangeType rangeType;
+  private final boolean skipCompaction;
+  private final boolean skipClustering;
+  private final int limit;
+
+  private IncrementalQueryAnalyzer(
+      HoodieTableMetaClient metaClient,
+      String startTime,
+      String endTime,
+      InstantRange.RangeType rangeType,
+      boolean skipCompaction,
+      boolean skipClustering,
+      int limit) {
+    this.metaClient = metaClient;
+    this.startTime = startTime;
+    this.endTime = endTime;
+    this.rangeType = rangeType;
+    this.skipCompaction = skipCompaction;
+    this.skipClustering = skipClustering;
+    this.limit = limit;
+  }
+
+  /**
+   * Returns a builder.
+   */
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  /**
+   * Analyzes the incremental query context with given completion time range.
+   *
+   * @return An incremental query context including the instant time range 
info.
+   */
+  public QueryContext analyze() {
+    try (CompletionTimeQueryView completionTimeQueryView = new 
CompletionTimeQueryView(this.metaClient)) {
+      if (completionTimeQueryView.isEmptyTable()) {
+        // no dataset committed in the table
+        return QueryContext.EMPTY;
+      }
+      HoodieTimeline readTimeline = getReadTimeline(this.metaClient);
+      List<String> instantTimeList = 
completionTimeQueryView.getStartTime(readTimeline, startTime, endTime, 
rangeType);
+      if (instantTimeList.isEmpty()) {
+        // no instants completed within the give time range, returns early.
+        return QueryContext.EMPTY;
+      }
+      // get hoodie instants
+      Pair<List<String>, List<String>> splitInstantTime = 
splitInstantByActiveness(instantTimeList, completionTimeQueryView);
+      Set<String> instantTimeSet = new HashSet<>(instantTimeList);
+      List<String> archivedInstantTime = splitInstantTime.getKey();
+      List<String> activeInstantTime = splitInstantTime.getValue();
+      List<HoodieInstant> archivedInstants = new ArrayList<>();
+      List<HoodieInstant> activeInstants = new ArrayList<>();
+      HoodieTimeline archivedReadTimeline = null;
+      if (!activeInstantTime.isEmpty()) {
+        activeInstants = readTimeline.getInstantsAsStream().filter(instant -> 
instantTimeSet.contains(instant.getTimestamp())).collect(Collectors.toList());
+        if (limit > 0 && limit < activeInstants.size()) {
+          // streaming read speed limit, limits the maximum number of commits 
allowed to read for each run
+          activeInstants = activeInstants.subList(0, limit);
+        }
+      }
+      if (!archivedInstantTime.isEmpty()) {
+        archivedReadTimeline = getArchivedReadTimeline(metaClient, 
archivedInstantTime.get(0));
+        archivedInstants = 
archivedReadTimeline.getInstantsAsStream().filter(instant -> 
instantTimeSet.contains(instant.getTimestamp())).collect(Collectors.toList());
+      }
+      List<String> instants = Stream.concat(archivedInstants.stream(), 
activeInstants.stream()).map(HoodieInstant::getTimestamp).collect(Collectors.toList());
+      if (instants.isEmpty()) {
+        // no instants completed within the give time range, returns early.
+        return QueryContext.EMPTY;
+      }
+      if (startTime == null && endTime != null) {
+        instants = Collections.singletonList(instants.get(instants.size() - 
1));
+      }
+      String lastInstant = instants.get(instants.size() - 1);
+      // keep the same semantics with streaming read, default start from the 
latest commit
+      String startInstant = START_COMMIT_EARLIEST.equalsIgnoreCase(startTime) 
? null : startTime == null ? lastInstant : instants.get(0);
+      String endInstant = endTime == null ? null : lastInstant;
+      return QueryContext.create(startInstant, endInstant, instants, 
archivedInstants, activeInstants, readTimeline, archivedReadTimeline);
+    }
+  }
+
+  /**
+   * Splits the given instant time list into a pair of archived instant list 
and active instant list.
+   */
+  private static Pair<List<String>, List<String>> 
splitInstantByActiveness(List<String> instantTimeList, CompletionTimeQueryView 
completionTimeQueryView) {
+    int firstActiveIdx = IntStream.range(0, instantTimeList.size()).filter(i 
-> 
!completionTimeQueryView.isArchived(instantTimeList.get(i))).findFirst().orElse(-1);
+    if (firstActiveIdx == -1) {
+      return Pair.of(instantTimeList, Collections.emptyList());
+    } else if (firstActiveIdx == 0) {
+      return Pair.of(Collections.emptyList(), instantTimeList);
+    } else {
+      return Pair.of(instantTimeList.subList(0, firstActiveIdx), 
instantTimeList.subList(firstActiveIdx, instantTimeList.size()));
+    }
+  }
+
+  private HoodieTimeline getReadTimeline(HoodieTableMetaClient metaClient) {
+    HoodieTimeline timeline = 
metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants();
+    return filterInstantsAsPerUserConfigs(metaClient, timeline, 
this.skipCompaction, this.skipClustering);
+  }
+
+  private HoodieTimeline getArchivedReadTimeline(HoodieTableMetaClient 
metaClient, String startInstant) {
+    HoodieArchivedTimeline archivedTimeline = 
metaClient.getArchivedTimeline(startInstant, false);
+    HoodieTimeline archivedCompleteTimeline = 
archivedTimeline.getCommitsTimeline().filterCompletedInstants();
+    return filterInstantsAsPerUserConfigs(metaClient, 
archivedCompleteTimeline, this.skipCompaction, this.skipClustering);
+  }
+
+  /**
+   * Filters out the unnecessary instants as per user specified configs.
+   *
+   * @param timeline The timeline.
+   *
+   * @return the filtered timeline
+   */
+  @VisibleForTesting
+  public static HoodieTimeline 
filterInstantsAsPerUserConfigs(HoodieTableMetaClient metaClient, HoodieTimeline 
timeline, boolean skipCompaction, boolean skipClustering) {
+    final HoodieTimeline oriTimeline = timeline;
+    if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ & 
skipCompaction) {
+      // the compaction commit uses 'commit' as action which is tricky
+      timeline = timeline.filter(instant -> 
!instant.getAction().equals(HoodieTimeline.COMMIT_ACTION));
+    }
+    if (skipClustering) {
+      timeline = timeline.filter(instant -> 
!ClusteringUtils.isClusteringInstant(instant, oriTimeline));
+    }
+    return timeline;
+  }
+
+  // -------------------------------------------------------------------------
+  //  Inner Class
+  // -------------------------------------------------------------------------
+  /**
+   * Builder for {@link IncrementalQueryAnalyzer}.
+   */
+  public static class Builder {
+    /**
+     * Start completion time.
+     */
+    private String startTime;
+    /**
+     * End completion time.
+     */
+    private String endTime;
+    private InstantRange.RangeType rangeType;
+    private HoodieTableMetaClient metaClient;
+    private boolean skipCompaction = false;
+    private boolean skipClustering = false;
+    /**
+     * Maximum number of instants to read per run.
+     */
+    private int limit = -1;
+
+    public Builder() {
+    }
+
+    public Builder startTime(String startTime) {
+      this.startTime = startTime;
+      return this;
+    }
+
+    public Builder endTime(String endTime) {
+      this.endTime = endTime;
+      return this;
+    }
+
+    public Builder rangeType(InstantRange.RangeType rangeType) {
+      this.rangeType = rangeType;
+      return this;
+    }
+
+    public Builder metaClient(HoodieTableMetaClient metaClient) {
+      this.metaClient = metaClient;
+      return this;
+    }
+
+    public Builder skipCompaction(boolean skipCompaction) {

Review Comment:
   why are these needed? is it for table services and other internal processes?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to