codope commented on code in PR #10255:
URL: https://github.com/apache/hudi/pull/10255#discussion_r1439195459


##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/IncrementalQueryAnalyzer.java:
##########
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.read;
+
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.log.InstantRange;
+import org.apache.hudi.common.table.timeline.CompletionTimeQueryView;
+import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.VisibleForTesting;
+import org.apache.hudi.common.util.collection.Pair;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+/**
+ * Analyzer for incremental queries.
+ *
+ * <p>The analyzer can supply info about the incremental queries including:
+ * <ul>
+ *   <li>The archived instant candidates;</li>
+ *   <li>The active instant candidates;</li>
+ *   <li>The instant filtering predicate, e.g the instant range;</li>
+ *   <li>Whether the query starts from the earliest;</li>
+ *   <li>Whether the query ends to the latest;</li>
+ *   <li>The max completion time used for fs view file slice version 
filtering.</li>
+ * </ul>
+ *
+ * <p><h2>Criteria for different query ranges:</h2>
+ *
+ * <table>
+ *   <tr>
+ *     <th>Query Range</th>
+ *     <th>File Handles Decoding</th>
+ *     <th>Instant Filtering Predicate</th>
+ *   </tr>
+ *   <tr>
+ *     <td>[earliest, _]</td>
+ *     <td>The latest snapshot files from table metadata</td>
+ *     <td>_</td>

Review Comment:
   What is `earliest` here? Is it the earliest instant in the active, or is it 
including archived? Also, from the doc it is not clear why the instant 
filtering predicate is empty?



##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java:
##########
@@ -72,67 +76,77 @@
 public class TestIncrementalInputSplits extends HoodieCommonTestHarness {
 
   @BeforeEach
-  void init() throws IOException {
+  void init() {
     initPath();
-    initMetaClient();
   }
 
   @Test
-  void testFilterInstantsWithRange() {
-    HoodieActiveTimeline timeline = new HoodieActiveTimeline(metaClient, true);
+  void testFilterInstantsWithRange() throws IOException {
     Configuration conf = TestConfigurations.getDefaultConf(basePath);
     conf.set(FlinkOptions.READ_STREAMING_SKIP_CLUSTERING, true);
-    IncrementalInputSplits iis = IncrementalInputSplits.builder()
-        .conf(conf)
-        .path(new Path(basePath))
-        .rowType(TestConfigurations.ROW_TYPE)
-        
.skipClustering(conf.getBoolean(FlinkOptions.READ_STREAMING_SKIP_CLUSTERING))
-        .build();
+    conf.set(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
+    metaClient = HoodieTestUtils.init(basePath, HoodieTableType.MERGE_ON_READ);
 
+    HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
     HoodieInstant commit1 = new HoodieInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "1");
     HoodieInstant commit2 = new HoodieInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "2");
     HoodieInstant commit3 = new HoodieInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "3");
     timeline.createCompleteInstant(commit1);
     timeline.createCompleteInstant(commit2);
     timeline.createCompleteInstant(commit3);
-    timeline = timeline.reload();
+    timeline = metaClient.reloadActiveTimeline();
+
+    Map<String, String> completionTimeMap = 
timeline.filterCompletedInstants().getInstantsAsStream()
+        .collect(Collectors.toMap(HoodieInstant::getTimestamp, 
HoodieInstant::getCompletionTime));
 
+    IncrementalQueryAnalyzer analyzer1 = IncrementalQueryAnalyzer.builder()
+        .metaClient(metaClient)
+        .rangeType(InstantRange.RangeType.OPEN_CLOSE)
+        .startTime(completionTimeMap.get("1"))
+        .skipClustering(true)
+        .build();
     // previous read iteration read till instant time "1", next read iteration 
should return ["2", "3"]
-    List<HoodieInstant> instantRange2 = iis.filterInstantsWithRange(timeline, 
"1");
-    assertEquals(2, instantRange2.size());
-    assertIterableEquals(Arrays.asList(commit2, commit3), instantRange2);
+    List<HoodieInstant> activeInstants1 = 
analyzer1.analyze().getActiveInstants();
+    assertEquals(2, activeInstants1.size());
+    assertIterableEquals(Arrays.asList(commit2, commit3), activeInstants1);
 
     // simulate first iteration cycle with read from the LATEST commit
-    List<HoodieInstant> instantRange1 = iis.filterInstantsWithRange(timeline, 
null);
-    assertEquals(1, instantRange1.size());
-    assertIterableEquals(Collections.singletonList(commit3), instantRange1);
+    IncrementalQueryAnalyzer analyzer2 = IncrementalQueryAnalyzer.builder()
+        .metaClient(metaClient)
+        .rangeType(InstantRange.RangeType.CLOSE_CLOSE)
+        .skipClustering(true)
+        .build();
+    List<HoodieInstant> activeInstants2 = 
analyzer2.analyze().getActiveInstants();
+    assertEquals(1, activeInstants2.size());
+    assertIterableEquals(Collections.singletonList(commit3), activeInstants2);
 
     // specifying a start and end commit
-    conf.set(FlinkOptions.READ_START_COMMIT, "1");
-    conf.set(FlinkOptions.READ_END_COMMIT, "3");
-    List<HoodieInstant> instantRange3 = iis.filterInstantsWithRange(timeline, 
null);
-    assertEquals(3, instantRange3.size());
-    assertIterableEquals(Arrays.asList(commit1, commit2, commit3), 
instantRange3);
+    IncrementalQueryAnalyzer analyzer3 = IncrementalQueryAnalyzer.builder()

Review Comment:
   let's also test for null/emoty start and end time.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/IncrementalQueryAnalyzer.java:
##########
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.read;
+
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.log.InstantRange;
+import org.apache.hudi.common.table.timeline.CompletionTimeQueryView;
+import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.VisibleForTesting;
+import org.apache.hudi.common.util.collection.Pair;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+/**
+ * Analyzer for incremental queries.
+ *
+ * <p>The analyzer can supply info about the incremental queries including:
+ * <ul>
+ *   <li>The archived instant candidates;</li>
+ *   <li>The active instant candidates;</li>
+ *   <li>The instant filtering predicate, e.g the instant range;</li>
+ *   <li>Whether the query starts from the earliest;</li>
+ *   <li>Whether the query ends to the latest;</li>
+ *   <li>The max completion time used for fs view file slice version 
filtering.</li>
+ * </ul>
+ *
+ * <p><h2>Criteria for different query ranges:</h2>
+ *
+ * <table>
+ *   <tr>
+ *     <th>Query Range</th>
+ *     <th>File Handles Decoding</th>
+ *     <th>Instant Filtering Predicate</th>
+ *   </tr>
+ *   <tr>
+ *     <td>[earliest, _]</td>
+ *     <td>The latest snapshot files from table metadata</td>
+ *     <td>_</td>
+ *   </tr>
+ *   <tr>
+ *     <td>[earliest, endTime]</td>
+ *     <td>The latest snapshot files from table metadata</td>
+ *     <td>'_hoodie_commit_time' in setA, setA is a collection of all the 
instants completed before or on 'endTime'</td>
+ *   </tr>
+ *   <tr>
+ *     <td>[_, _]</td>
+ *     <td>The latest completed instant metadata</td>
+ *     <td>'_hoodie_commit_time' = i_n, i_n is the latest completed 
instant</td>
+ *   </tr>
+ *   <tr>
+ *     <td>[_, endTime]</td>
+ *     <td>i).find the last completed instant i_n before or on 'endTim;
+ *     ii). read the latest snapshot from table metadata if i_n is archived or 
the commit metadata if it is still active</td>
+ *     <td>'_hoodie_commit_time' = i_n</td>
+ *   </tr>
+ *   <tr>
+ *     <td>[startTime, _]</td>
+ *     <td>i).find the instant set setA, setA is a collection of all the 
instants completed before or on 'endTime';
+ *     ii). read the latest snapshot from table metadata if setA has archived 
instants or the commit metadata if all the instants are still active</td>
+ *     <td>'_hoodie_commit_time' in setA</td>
+ *   </tr>
+ *   <tr>
+ *     <td>[earliest, endTime]</td>
+ *     <td>i).find the instant set setA, setA is a collection of all the 
instants completed in the given time range;
+ *     ii). read the latest snapshot from table metadata if setA has archived 
instants or the commit metadata if all the instants are still active</td>
+ *     <td>'_hoodie_commit_time' in setA</td>
+ *   </tr>
+ * </table>
+ *
+ * <p> A range type is required for analyzing the query so that the query 
range boundary inclusiveness have clear semantics.
+ *
+ * <p>IMPORTANT: the reader may optionally choose to fall back to reading the 
latest snapshot if there are files missing from decoding the commit metadata.
+ */
+public class IncrementalQueryAnalyzer {
+  public static final String START_COMMIT_EARLIEST = "earliest";
+
+  private final HoodieTableMetaClient metaClient;
+  private final Option<String> startTime;
+  private final Option<String> endTime;
+  private final InstantRange.RangeType rangeType;
+  private final boolean skipCompaction;
+  private final boolean skipClustering;
+  private final int limit;
+
+  private IncrementalQueryAnalyzer(
+      HoodieTableMetaClient metaClient,
+      String startTime,
+      String endTime,
+      InstantRange.RangeType rangeType,
+      boolean skipCompaction,
+      boolean skipClustering,
+      int limit) {
+    this.metaClient = metaClient;
+    this.startTime = Option.ofNullable(startTime);
+    this.endTime = Option.ofNullable(endTime);
+    this.rangeType = rangeType;
+    this.skipCompaction = skipCompaction;
+    this.skipClustering = skipClustering;
+    this.limit = limit;
+  }
+
+  /**
+   * Returns a builder.
+   */
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  /**
+   * Analyzes the incremental query context with given completion time range.
+   *
+   * @return An incremental query context including the instant time range 
info.
+   */
+  public QueryContext analyze() {
+    try (CompletionTimeQueryView completionTimeQueryView = new 
CompletionTimeQueryView(this.metaClient)) {
+      if (completionTimeQueryView.isEmptyTable()) {
+        // no dataset committed in the table
+        return QueryContext.EMPTY;
+      }
+      HoodieTimeline readTimeline = getReadTimeline(this.metaClient);
+      List<String> instantTimeList = 
completionTimeQueryView.getStartTimes(readTimeline, startTime, endTime, 
rangeType);
+      if (instantTimeList.isEmpty()) {
+        // no instants completed within the give time range, returns early.
+        return QueryContext.EMPTY;
+      }
+      // get hoodie instants
+      Pair<List<String>, List<String>> splitInstantTime = 
splitInstantByActiveness(instantTimeList, completionTimeQueryView);
+      Set<String> instantTimeSet = new HashSet<>(instantTimeList);
+      List<String> archivedInstantTime = splitInstantTime.getKey();
+      List<String> activeInstantTime = splitInstantTime.getValue();
+      List<HoodieInstant> archivedInstants = new ArrayList<>();
+      List<HoodieInstant> activeInstants = new ArrayList<>();
+      HoodieTimeline archivedReadTimeline = null;
+      if (!activeInstantTime.isEmpty()) {
+        activeInstants = readTimeline.getInstantsAsStream().filter(instant -> 
instantTimeSet.contains(instant.getTimestamp())).collect(Collectors.toList());
+        if (limit > 0 && limit < activeInstants.size()) {
+          // streaming read speed limit, limits the maximum number of commits 
allowed to read for each run
+          activeInstants = activeInstants.subList(0, limit);
+        }
+      }
+      if (!archivedInstantTime.isEmpty()) {
+        archivedReadTimeline = getArchivedReadTimeline(metaClient, 
archivedInstantTime.get(0));
+        archivedInstants = 
archivedReadTimeline.getInstantsAsStream().filter(instant -> 
instantTimeSet.contains(instant.getTimestamp())).collect(Collectors.toList());
+      }
+      List<String> instants = Stream.concat(archivedInstants.stream(), 
activeInstants.stream()).map(HoodieInstant::getTimestamp).collect(Collectors.toList());
+      if (instants.isEmpty()) {
+        // no instants completed within the give time range, returns early.
+        return QueryContext.EMPTY;
+      }
+      if (startTime.isEmpty() && endTime.isPresent()) {
+        instants = Collections.singletonList(instants.get(instants.size() - 
1));
+      }
+      String lastInstant = instants.get(instants.size() - 1);
+      // keep the same semantics with streaming read, default start from the 
latest commit
+      String startInstant = 
START_COMMIT_EARLIEST.equalsIgnoreCase(startTime.orElse(null)) ? null : 
startTime.isEmpty() ? lastInstant : instants.get(0);
+      String endInstant = endTime.isEmpty() ? null : lastInstant;
+      return QueryContext.create(startInstant, endInstant, instants, 
archivedInstants, activeInstants, readTimeline, archivedReadTimeline);
+    }
+  }
+
+  /**
+   * Splits the given instant time list into a pair of archived instant list 
and active instant list.
+   */
+  private static Pair<List<String>, List<String>> 
splitInstantByActiveness(List<String> instantTimeList, CompletionTimeQueryView 
completionTimeQueryView) {
+    int firstActiveIdx = IntStream.range(0, instantTimeList.size()).filter(i 
-> 
!completionTimeQueryView.isArchived(instantTimeList.get(i))).findFirst().orElse(-1);
+    if (firstActiveIdx == -1) {
+      return Pair.of(instantTimeList, Collections.emptyList());
+    } else if (firstActiveIdx == 0) {
+      return Pair.of(Collections.emptyList(), instantTimeList);
+    } else {
+      return Pair.of(instantTimeList.subList(0, firstActiveIdx), 
instantTimeList.subList(firstActiveIdx, instantTimeList.size()));
+    }
+  }
+
+  private HoodieTimeline getReadTimeline(HoodieTableMetaClient metaClient) {
+    HoodieTimeline timeline = 
metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants();
+    return filterInstantsAsPerUserConfigs(metaClient, timeline, 
this.skipCompaction, this.skipClustering);
+  }
+
+  private HoodieTimeline getArchivedReadTimeline(HoodieTableMetaClient 
metaClient, String startInstant) {
+    HoodieArchivedTimeline archivedTimeline = 
metaClient.getArchivedTimeline(startInstant, false);
+    HoodieTimeline archivedCompleteTimeline = 
archivedTimeline.getCommitsTimeline().filterCompletedInstants();
+    return filterInstantsAsPerUserConfigs(metaClient, 
archivedCompleteTimeline, this.skipCompaction, this.skipClustering);
+  }
+
+  /**
+   * Filters out the unnecessary instants as per user specified configs.
+   *
+   * @param timeline The timeline.
+   *
+   * @return the filtered timeline
+   */
+  @VisibleForTesting
+  public static HoodieTimeline 
filterInstantsAsPerUserConfigs(HoodieTableMetaClient metaClient, HoodieTimeline 
timeline, boolean skipCompaction, boolean skipClustering) {
+    final HoodieTimeline oriTimeline = timeline;
+    if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ & 
skipCompaction) {
+      // the compaction commit uses 'commit' as action which is tricky
+      timeline = timeline.filter(instant -> 
!instant.getAction().equals(HoodieTimeline.COMMIT_ACTION));
+    }
+    if (skipClustering) {
+      timeline = timeline.filter(instant -> 
!ClusteringUtils.isClusteringInstant(instant, oriTimeline));
+    }
+    return timeline;
+  }
+
+  // -------------------------------------------------------------------------
+  //  Inner Class
+  // -------------------------------------------------------------------------
+  /**
+   * Builder for {@link IncrementalQueryAnalyzer}.
+   */
+  public static class Builder {
+    /**
+     * Start completion time.
+     */
+    private String startTime;
+    /**
+     * End completion time.
+     */
+    private String endTime;
+    private InstantRange.RangeType rangeType;
+    private HoodieTableMetaClient metaClient;
+    private boolean skipCompaction = false;
+    private boolean skipClustering = false;
+    /**
+     * Maximum number of instants to read per run.
+     */
+    private int limit = -1;
+
+    public Builder() {
+    }
+
+    public Builder startTime(String startTime) {
+      this.startTime = startTime;
+      return this;
+    }
+
+    public Builder endTime(String endTime) {
+      this.endTime = endTime;
+      return this;
+    }
+
+    public Builder rangeType(InstantRange.RangeType rangeType) {
+      this.rangeType = rangeType;
+      return this;
+    }
+
+    public Builder metaClient(HoodieTableMetaClient metaClient) {
+      this.metaClient = metaClient;
+      return this;
+    }
+
+    public Builder skipCompaction(boolean skipCompaction) {
+      this.skipCompaction = skipCompaction;
+      return this;
+    }
+
+    public Builder skipClustering(boolean skipClustering) {
+      this.skipClustering = skipClustering;
+      return this;
+    }
+
+    public Builder limit(int limit) {
+      this.limit = limit;
+      return this;
+    }
+
+    public IncrementalQueryAnalyzer build() {
+      return new 
IncrementalQueryAnalyzer(Objects.requireNonNull(this.metaClient), 
this.startTime, this.endTime,
+          Objects.requireNonNull(this.rangeType), this.skipCompaction, 
this.skipClustering, this.limit);
+    }
+  }
+
+  /**
+   * Represents the analyzed query context.
+   */
+  public static class QueryContext {
+    public static final QueryContext EMPTY = new QueryContext(null, null, 
Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), 
null, null);
+
+    /**
+     * An empty option indicates consumption from the earliest instant.
+     */
+    private final Option<String> startInstant;
+    /**
+     * An empty option indicates consumption to the latest instant.
+     */
+    private final Option<String> endInstant;
+    private final List<HoodieInstant> archivedInstants;
+    private final List<HoodieInstant> activeInstants;
+    /**
+     * The active timeline to read filtered by given configurations.
+     */
+    private final HoodieTimeline readTimeline;
+    /**
+     * The archived timeline to read filtered by given configurations.
+     */
+    private final HoodieTimeline archivedReadTimeline;

Review Comment:
   `activeTimelineFiltered` and `archivedTimelineFiltered` sounds more 
intuitive.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/IncrementalQueryAnalyzer.java:
##########
@@ -0,0 +1,425 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.read;
+
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.log.InstantRange;
+import org.apache.hudi.common.table.timeline.CompletionTimeQueryView;
+import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.VisibleForTesting;
+import org.apache.hudi.common.util.collection.Pair;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+/**
+ * Analyzer for incremental queries.
+ *
+ * <p>The analyzer can supply info about the incremental queries including:
+ * <ul>
+ *   <li>The archived instant candidates;</li>
+ *   <li>The active instant candidates;</li>
+ *   <li>The instant filtering predicate, e.g the instant range;</li>
+ *   <li>Whether the query starts from the earliest;</li>
+ *   <li>Whether the query ends to the latest;</li>
+ *   <li>The max completion time used for fs view file slice version 
filtering.</li>
+ * </ul>
+ *
+ * <p><h2>Criteria for different query ranges:</h2>
+ *
+ * <table>
+ *   <tr>
+ *     <th>Query Range</th>
+ *     <th>File Handles Decoding</th>
+ *     <th>Instant Filtering Predicate</th>
+ *   </tr>
+ *   <tr>
+ *     <td>[earliest, _]</td>
+ *     <td>The latest snapshot files from table metadata</td>
+ *     <td>_</td>
+ *   </tr>
+ *   <tr>
+ *     <td>[earliest, endTime]</td>
+ *     <td>The latest snapshot files from table metadata</td>
+ *     <td>'_hoodie_commit_time' in setA, setA is a collection of all the 
instants completed before or on 'endTime'</td>
+ *   </tr>
+ *   <tr>
+ *     <td>[_, _]</td>
+ *     <td>The latest completed instant metadata</td>
+ *     <td>'_hoodie_commit_time' = i_n, i_n is the latest completed 
instant</td>
+ *   </tr>
+ *   <tr>
+ *     <td>[_, endTime]</td>
+ *     <td>i).find the last completed instant i_n before or on 'endTim;
+ *     ii). read the latest snapshot from table metadata if i_n is archived or 
the commit metadata if it is still active</td>

Review Comment:
   +1. Please file a followup and link it here.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/IncrementalQueryAnalyzer.java:
##########
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.read;
+
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.log.InstantRange;
+import org.apache.hudi.common.table.timeline.CompletionTimeQueryView;
+import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.VisibleForTesting;
+import org.apache.hudi.common.util.collection.Pair;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+/**
+ * Analyzer for incremental queries.
+ *
+ * <p>The analyzer can supply info about the incremental queries including:
+ * <ul>
+ *   <li>The archived instant candidates;</li>
+ *   <li>The active instant candidates;</li>
+ *   <li>The instant filtering predicate, e.g the instant range;</li>
+ *   <li>Whether the query starts from the earliest;</li>
+ *   <li>Whether the query ends to the latest;</li>
+ *   <li>The max completion time used for fs view file slice version 
filtering.</li>
+ * </ul>
+ *
+ * <p><h2>Criteria for different query ranges:</h2>
+ *
+ * <table>
+ *   <tr>
+ *     <th>Query Range</th>
+ *     <th>File Handles Decoding</th>
+ *     <th>Instant Filtering Predicate</th>
+ *   </tr>
+ *   <tr>
+ *     <td>[earliest, _]</td>
+ *     <td>The latest snapshot files from table metadata</td>
+ *     <td>_</td>
+ *   </tr>
+ *   <tr>
+ *     <td>[earliest, endTime]</td>
+ *     <td>The latest snapshot files from table metadata</td>
+ *     <td>'_hoodie_commit_time' in setA, setA is a collection of all the 
instants completed before or on 'endTime'</td>
+ *   </tr>
+ *   <tr>
+ *     <td>[_, _]</td>
+ *     <td>The latest completed instant metadata</td>
+ *     <td>'_hoodie_commit_time' = i_n, i_n is the latest completed 
instant</td>
+ *   </tr>
+ *   <tr>
+ *     <td>[_, endTime]</td>
+ *     <td>i).find the last completed instant i_n before or on 'endTim;
+ *     ii). read the latest snapshot from table metadata if i_n is archived or 
the commit metadata if it is still active</td>
+ *     <td>'_hoodie_commit_time' = i_n</td>
+ *   </tr>
+ *   <tr>
+ *     <td>[startTime, _]</td>
+ *     <td>i).find the instant set setA, setA is a collection of all the 
instants completed before or on 'endTime';
+ *     ii). read the latest snapshot from table metadata if setA has archived 
instants or the commit metadata if all the instants are still active</td>
+ *     <td>'_hoodie_commit_time' in setA</td>
+ *   </tr>
+ *   <tr>
+ *     <td>[earliest, endTime]</td>
+ *     <td>i).find the instant set setA, setA is a collection of all the 
instants completed in the given time range;
+ *     ii). read the latest snapshot from table metadata if setA has archived 
instants or the commit metadata if all the instants are still active</td>
+ *     <td>'_hoodie_commit_time' in setA</td>
+ *   </tr>
+ * </table>
+ *
+ * <p> A range type is required for analyzing the query so that the query 
range boundary inclusiveness have clear semantics.
+ *
+ * <p>IMPORTANT: the reader may optionally choose to fall back to reading the 
latest snapshot if there are files missing from decoding the commit metadata.
+ */
+public class IncrementalQueryAnalyzer {
+  public static final String START_COMMIT_EARLIEST = "earliest";
+
+  private final HoodieTableMetaClient metaClient;
+  private final Option<String> startTime;
+  private final Option<String> endTime;
+  private final InstantRange.RangeType rangeType;
+  private final boolean skipCompaction;
+  private final boolean skipClustering;
+  private final int limit;
+
+  private IncrementalQueryAnalyzer(
+      HoodieTableMetaClient metaClient,
+      String startTime,
+      String endTime,
+      InstantRange.RangeType rangeType,
+      boolean skipCompaction,
+      boolean skipClustering,
+      int limit) {
+    this.metaClient = metaClient;
+    this.startTime = Option.ofNullable(startTime);
+    this.endTime = Option.ofNullable(endTime);
+    this.rangeType = rangeType;
+    this.skipCompaction = skipCompaction;
+    this.skipClustering = skipClustering;
+    this.limit = limit;
+  }
+
+  /**
+   * Returns a builder.
+   */
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  /**
+   * Analyzes the incremental query context with given completion time range.
+   *
+   * @return An incremental query context including the instant time range 
info.
+   */
+  public QueryContext analyze() {
+    try (CompletionTimeQueryView completionTimeQueryView = new 
CompletionTimeQueryView(this.metaClient)) {
+      if (completionTimeQueryView.isEmptyTable()) {
+        // no dataset committed in the table
+        return QueryContext.EMPTY;
+      }
+      HoodieTimeline readTimeline = getReadTimeline(this.metaClient);
+      List<String> instantTimeList = 
completionTimeQueryView.getStartTimes(readTimeline, startTime, endTime, 
rangeType);
+      if (instantTimeList.isEmpty()) {
+        // no instants completed within the give time range, returns early.
+        return QueryContext.EMPTY;
+      }
+      // get hoodie instants
+      Pair<List<String>, List<String>> splitInstantTime = 
splitInstantByActiveness(instantTimeList, completionTimeQueryView);
+      Set<String> instantTimeSet = new HashSet<>(instantTimeList);
+      List<String> archivedInstantTime = splitInstantTime.getKey();
+      List<String> activeInstantTime = splitInstantTime.getValue();
+      List<HoodieInstant> archivedInstants = new ArrayList<>();
+      List<HoodieInstant> activeInstants = new ArrayList<>();
+      HoodieTimeline archivedReadTimeline = null;
+      if (!activeInstantTime.isEmpty()) {
+        activeInstants = readTimeline.getInstantsAsStream().filter(instant -> 
instantTimeSet.contains(instant.getTimestamp())).collect(Collectors.toList());
+        if (limit > 0 && limit < activeInstants.size()) {
+          // streaming read speed limit, limits the maximum number of commits 
allowed to read for each run
+          activeInstants = activeInstants.subList(0, limit);
+        }
+      }
+      if (!archivedInstantTime.isEmpty()) {
+        archivedReadTimeline = getArchivedReadTimeline(metaClient, 
archivedInstantTime.get(0));
+        archivedInstants = 
archivedReadTimeline.getInstantsAsStream().filter(instant -> 
instantTimeSet.contains(instant.getTimestamp())).collect(Collectors.toList());
+      }
+      List<String> instants = Stream.concat(archivedInstants.stream(), 
activeInstants.stream()).map(HoodieInstant::getTimestamp).collect(Collectors.toList());
+      if (instants.isEmpty()) {
+        // no instants completed within the give time range, returns early.
+        return QueryContext.EMPTY;
+      }
+      if (startTime.isEmpty() && endTime.isPresent()) {
+        instants = Collections.singletonList(instants.get(instants.size() - 
1));
+      }
+      String lastInstant = instants.get(instants.size() - 1);
+      // keep the same semantics with streaming read, default start from the 
latest commit
+      String startInstant = 
START_COMMIT_EARLIEST.equalsIgnoreCase(startTime.orElse(null)) ? null : 
startTime.isEmpty() ? lastInstant : instants.get(0);
+      String endInstant = endTime.isEmpty() ? null : lastInstant;
+      return QueryContext.create(startInstant, endInstant, instants, 
archivedInstants, activeInstants, readTimeline, archivedReadTimeline);
+    }
+  }
+
+  /**
+   * Splits the given instant time list into a pair of archived instant list 
and active instant list.
+   */
+  private static Pair<List<String>, List<String>> 
splitInstantByActiveness(List<String> instantTimeList, CompletionTimeQueryView 
completionTimeQueryView) {

Review Comment:
   `Pair` implements `Map`. I think for this use case, a simple POJO would 
suffice. Wdyt?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/IncrementalQueryAnalyzer.java:
##########
@@ -0,0 +1,425 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.read;
+
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.log.InstantRange;
+import org.apache.hudi.common.table.timeline.CompletionTimeQueryView;
+import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.VisibleForTesting;
+import org.apache.hudi.common.util.collection.Pair;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+/**
+ * Analyzer for incremental queries.
+ *
+ * <p>The analyzer can supply info about the incremental queries including:
+ * <ul>
+ *   <li>The archived instant candidates;</li>
+ *   <li>The active instant candidates;</li>
+ *   <li>The instant filtering predicate, e.g the instant range;</li>
+ *   <li>Whether the query starts from the earliest;</li>
+ *   <li>Whether the query ends to the latest;</li>
+ *   <li>The max completion time used for fs view file slice version 
filtering.</li>
+ * </ul>
+ *
+ * <p><h2>Criteria for different query ranges:</h2>
+ *
+ * <table>
+ *   <tr>
+ *     <th>Query Range</th>
+ *     <th>File Handles Decoding</th>
+ *     <th>Instant Filtering Predicate</th>
+ *   </tr>
+ *   <tr>
+ *     <td>[earliest, _]</td>
+ *     <td>The latest snapshot files from table metadata</td>
+ *     <td>_</td>
+ *   </tr>
+ *   <tr>
+ *     <td>[earliest, endTime]</td>
+ *     <td>The latest snapshot files from table metadata</td>
+ *     <td>'_hoodie_commit_time' in setA, setA is a collection of all the 
instants completed before or on 'endTime'</td>
+ *   </tr>
+ *   <tr>
+ *     <td>[_, _]</td>

Review Comment:
   When can it happen that both startTime and endTime are null/empty? 
   Also, since this is a range, it looks slightly confusing as a user. I mean 
if startTime is null/empty then should we not treat the earliest instant as the 
start of the range? Just like if endTime is null/emoty then we pick latest 
instant as the end of the range.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/CompletionTimeQueryView.java:
##########
@@ -175,42 +190,109 @@ public Option<String> getCompletionTime(String 
startTime) {
    *
    * <p>By default, assumes there is at most 1 day time of duration for an 
instant to accelerate the queries.
    *
-   * @param startCompletionTime The start completion time.
-   * @param endCompletionTime   The end completion time.
+   * @param readTimeline The read timeline.

Review Comment:
   let's rename to `activeTimelineFiltered`? `readTimeline` introduces 
ambiguity such as there is a writeTimeline or not.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/IncrementalQueryAnalyzer.java:
##########
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.read;
+
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.log.InstantRange;
+import org.apache.hudi.common.table.timeline.CompletionTimeQueryView;
+import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.VisibleForTesting;
+import org.apache.hudi.common.util.collection.Pair;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+/**
+ * Analyzer for incremental queries.
+ *
+ * <p>The analyzer can supply info about the incremental queries including:
+ * <ul>
+ *   <li>The archived instant candidates;</li>
+ *   <li>The active instant candidates;</li>
+ *   <li>The instant filtering predicate, e.g the instant range;</li>
+ *   <li>Whether the query starts from the earliest;</li>
+ *   <li>Whether the query ends to the latest;</li>
+ *   <li>The max completion time used for fs view file slice version 
filtering.</li>
+ * </ul>
+ *
+ * <p><h2>Criteria for different query ranges:</h2>
+ *
+ * <table>
+ *   <tr>
+ *     <th>Query Range</th>
+ *     <th>File Handles Decoding</th>
+ *     <th>Instant Filtering Predicate</th>
+ *   </tr>
+ *   <tr>
+ *     <td>[earliest, _]</td>
+ *     <td>The latest snapshot files from table metadata</td>
+ *     <td>_</td>
+ *   </tr>
+ *   <tr>
+ *     <td>[earliest, endTime]</td>
+ *     <td>The latest snapshot files from table metadata</td>
+ *     <td>'_hoodie_commit_time' in setA, setA is a collection of all the 
instants completed before or on 'endTime'</td>
+ *   </tr>
+ *   <tr>
+ *     <td>[_, _]</td>
+ *     <td>The latest completed instant metadata</td>
+ *     <td>'_hoodie_commit_time' = i_n, i_n is the latest completed 
instant</td>
+ *   </tr>
+ *   <tr>
+ *     <td>[_, endTime]</td>
+ *     <td>i).find the last completed instant i_n before or on 'endTim;
+ *     ii). read the latest snapshot from table metadata if i_n is archived or 
the commit metadata if it is still active</td>
+ *     <td>'_hoodie_commit_time' = i_n</td>
+ *   </tr>
+ *   <tr>
+ *     <td>[startTime, _]</td>
+ *     <td>i).find the instant set setA, setA is a collection of all the 
instants completed before or on 'endTime';

Review Comment:
   Not following this doc. I think setA should be a collection of all instants 
the completed on or after `startTime` right? In `[startTime, _]` endTime is not 
defined so it's basically the latest instant.



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java:
##########
@@ -516,7 +517,7 @@ public static boolean fileExists(FileSystem fs, Path path) {
   public static boolean isWriteCommit(HoodieTableType tableType, HoodieInstant 
instant, HoodieTimeline timeline) {
     return tableType == HoodieTableType.MERGE_ON_READ
         ? !instant.getAction().equals(HoodieTimeline.COMMIT_ACTION) // not a 
compaction
-        : !ClusteringUtil.isClusteringInstant(instant, timeline);   // not a 
clustering
+        : !ClusteringUtils.isClusteringInstant(instant, timeline);   // not a 
clustering

Review Comment:
   let's merge the methods of `ClusteringUtil` in hudi-flink to 
`ClusteringUtils` in hudi-common. Alternatively, rename `ClusteringUtil` to 
`FlinkClusteringUtil`



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java:
##########
@@ -135,44 +127,37 @@ public static Builder builder() {
   public Result inputSplits(
       HoodieTableMetaClient metaClient,
       boolean cdcEnabled) {
-    HoodieTimeline commitTimeline = getReadTimeline(metaClient);
-    if (commitTimeline.empty()) {
-      LOG.warn("No splits found for the table under path " + path);
+
+    IncrementalQueryAnalyzer analyzer = IncrementalQueryAnalyzer.builder()
+        .metaClient(metaClient)
+        .startTime(this.conf.getString(FlinkOptions.READ_START_COMMIT))
+        .endTime(this.conf.getString(FlinkOptions.READ_END_COMMIT))

Review Comment:
   When is it possible that both READ_START_COMMIT and READ_END_COMMIT are 
null/empty?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/IncrementalQueryAnalyzer.java:
##########
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.read;
+
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.log.InstantRange;
+import org.apache.hudi.common.table.timeline.CompletionTimeQueryView;
+import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.VisibleForTesting;
+import org.apache.hudi.common.util.collection.Pair;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+/**
+ * Analyzer for incremental queries.
+ *
+ * <p>The analyzer can supply info about the incremental queries including:
+ * <ul>
+ *   <li>The archived instant candidates;</li>
+ *   <li>The active instant candidates;</li>
+ *   <li>The instant filtering predicate, e.g the instant range;</li>
+ *   <li>Whether the query starts from the earliest;</li>
+ *   <li>Whether the query ends to the latest;</li>
+ *   <li>The max completion time used for fs view file slice version 
filtering.</li>
+ * </ul>
+ *
+ * <p><h2>Criteria for different query ranges:</h2>
+ *
+ * <table>
+ *   <tr>
+ *     <th>Query Range</th>
+ *     <th>File Handles Decoding</th>
+ *     <th>Instant Filtering Predicate</th>
+ *   </tr>
+ *   <tr>
+ *     <td>[earliest, _]</td>
+ *     <td>The latest snapshot files from table metadata</td>
+ *     <td>_</td>
+ *   </tr>
+ *   <tr>
+ *     <td>[earliest, endTime]</td>
+ *     <td>The latest snapshot files from table metadata</td>
+ *     <td>'_hoodie_commit_time' in setA, setA is a collection of all the 
instants completed before or on 'endTime'</td>
+ *   </tr>
+ *   <tr>
+ *     <td>[_, _]</td>
+ *     <td>The latest completed instant metadata</td>
+ *     <td>'_hoodie_commit_time' = i_n, i_n is the latest completed 
instant</td>
+ *   </tr>
+ *   <tr>
+ *     <td>[_, endTime]</td>
+ *     <td>i).find the last completed instant i_n before or on 'endTim;
+ *     ii). read the latest snapshot from table metadata if i_n is archived or 
the commit metadata if it is still active</td>
+ *     <td>'_hoodie_commit_time' = i_n</td>
+ *   </tr>
+ *   <tr>
+ *     <td>[startTime, _]</td>
+ *     <td>i).find the instant set setA, setA is a collection of all the 
instants completed before or on 'endTime';
+ *     ii). read the latest snapshot from table metadata if setA has archived 
instants or the commit metadata if all the instants are still active</td>
+ *     <td>'_hoodie_commit_time' in setA</td>
+ *   </tr>
+ *   <tr>
+ *     <td>[earliest, endTime]</td>
+ *     <td>i).find the instant set setA, setA is a collection of all the 
instants completed in the given time range;
+ *     ii). read the latest snapshot from table metadata if setA has archived 
instants or the commit metadata if all the instants are still active</td>
+ *     <td>'_hoodie_commit_time' in setA</td>
+ *   </tr>
+ * </table>
+ *
+ * <p> A range type is required for analyzing the query so that the query 
range boundary inclusiveness have clear semantics.
+ *
+ * <p>IMPORTANT: the reader may optionally choose to fall back to reading the 
latest snapshot if there are files missing from decoding the commit metadata.
+ */
+public class IncrementalQueryAnalyzer {
+  public static final String START_COMMIT_EARLIEST = "earliest";
+
+  private final HoodieTableMetaClient metaClient;
+  private final Option<String> startTime;
+  private final Option<String> endTime;
+  private final InstantRange.RangeType rangeType;
+  private final boolean skipCompaction;
+  private final boolean skipClustering;
+  private final int limit;
+
+  private IncrementalQueryAnalyzer(
+      HoodieTableMetaClient metaClient,
+      String startTime,
+      String endTime,
+      InstantRange.RangeType rangeType,
+      boolean skipCompaction,
+      boolean skipClustering,
+      int limit) {
+    this.metaClient = metaClient;
+    this.startTime = Option.ofNullable(startTime);
+    this.endTime = Option.ofNullable(endTime);
+    this.rangeType = rangeType;
+    this.skipCompaction = skipCompaction;
+    this.skipClustering = skipClustering;
+    this.limit = limit;
+  }
+
+  /**
+   * Returns a builder.
+   */
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  /**
+   * Analyzes the incremental query context with given completion time range.
+   *
+   * @return An incremental query context including the instant time range 
info.
+   */
+  public QueryContext analyze() {
+    try (CompletionTimeQueryView completionTimeQueryView = new 
CompletionTimeQueryView(this.metaClient)) {
+      if (completionTimeQueryView.isEmptyTable()) {
+        // no dataset committed in the table
+        return QueryContext.EMPTY;
+      }
+      HoodieTimeline readTimeline = getReadTimeline(this.metaClient);
+      List<String> instantTimeList = 
completionTimeQueryView.getStartTimes(readTimeline, startTime, endTime, 
rangeType);
+      if (instantTimeList.isEmpty()) {
+        // no instants completed within the give time range, returns early.
+        return QueryContext.EMPTY;
+      }
+      // get hoodie instants
+      Pair<List<String>, List<String>> splitInstantTime = 
splitInstantByActiveness(instantTimeList, completionTimeQueryView);
+      Set<String> instantTimeSet = new HashSet<>(instantTimeList);
+      List<String> archivedInstantTime = splitInstantTime.getKey();
+      List<String> activeInstantTime = splitInstantTime.getValue();
+      List<HoodieInstant> archivedInstants = new ArrayList<>();
+      List<HoodieInstant> activeInstants = new ArrayList<>();
+      HoodieTimeline archivedReadTimeline = null;
+      if (!activeInstantTime.isEmpty()) {
+        activeInstants = readTimeline.getInstantsAsStream().filter(instant -> 
instantTimeSet.contains(instant.getTimestamp())).collect(Collectors.toList());
+        if (limit > 0 && limit < activeInstants.size()) {
+          // streaming read speed limit, limits the maximum number of commits 
allowed to read for each run
+          activeInstants = activeInstants.subList(0, limit);
+        }
+      }
+      if (!archivedInstantTime.isEmpty()) {
+        archivedReadTimeline = getArchivedReadTimeline(metaClient, 
archivedInstantTime.get(0));
+        archivedInstants = 
archivedReadTimeline.getInstantsAsStream().filter(instant -> 
instantTimeSet.contains(instant.getTimestamp())).collect(Collectors.toList());
+      }
+      List<String> instants = Stream.concat(archivedInstants.stream(), 
activeInstants.stream()).map(HoodieInstant::getTimestamp).collect(Collectors.toList());
+      if (instants.isEmpty()) {
+        // no instants completed within the give time range, returns early.
+        return QueryContext.EMPTY;
+      }
+      if (startTime.isEmpty() && endTime.isPresent()) {

Review Comment:
   Where do we handle both startTime and endTime being empty?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/CompletionTimeQueryView.java:
##########
@@ -175,42 +187,111 @@ public Option<String> getCompletionTime(String 
startTime) {
    *
    * <p>By default, assumes there is at most 1 day time of duration for an 
instant to accelerate the queries.
    *
-   * @param startCompletionTime The start completion time.
-   * @param endCompletionTime   The end completion time.
+   * @param readTimeline The read timeline.
+   * @param rangeStart   The query range start completion time.
+   * @param rangeEnd     The query range end completion time.
+   * @param rangeType    The range type.
    *
-   * @return The instant time set.
+   * @return The sorted instant time list.
    */
-  public Set<String> getStartTimeSet(String startCompletionTime, String 
endCompletionTime) {
+  public List<String> getStartTimes(
+      HoodieTimeline readTimeline,
+      Option<String> rangeStart,
+      Option<String> rangeEnd,
+      InstantRange.RangeType rangeType) {
     // assumes any instant/transaction lasts at most 1 day to optimize the 
query efficiency.
-    return getStartTimeSet(startCompletionTime, endCompletionTime, s -> 
HoodieInstantTimeGenerator.instantTimeMinusMillis(s, MILLI_SECONDS_IN_ONE_DAY));
+    return getStartTimes(readTimeline, rangeStart, rangeEnd, rangeType, s -> 
HoodieInstantTimeGenerator.instantTimeMinusMillis(s, MILLI_SECONDS_IN_ONE_DAY));
   }
 
   /**
    * Queries the instant start time with given completion time range.
    *
-   * @param startCompletionTime   The start completion time.
-   * @param endCompletionTime     The end completion time.
-   * @param earliestStartTimeFunc The function to generate the earliest start 
time boundary
-   *                              with the minimum completion time {@code 
startCompletionTime}.
+   * @param rangeStart              The query range start completion time.
+   * @param rangeEnd                The query range end completion time.
+   * @param earliestInstantTimeFunc The function to generate the earliest 
start time boundary
+   *                                with the minimum completion time.
    *
-   * @return The instant time set.
+   * @return The sorted instant time list.
    */
-  public Set<String> getStartTimeSet(String startCompletionTime, String 
endCompletionTime, Function<String, String> earliestStartTimeFunc) {
-    String startInstant = earliestStartTimeFunc.apply(startCompletionTime);
+  @VisibleForTesting
+  public List<String> getStartTimes(
+      String rangeStart,
+      String rangeEnd,
+      Function<String, String> earliestInstantTimeFunc) {
+    return 
getStartTimes(metaClient.getCommitsTimeline().filterCompletedInstants(), 
Option.ofNullable(rangeStart), Option.ofNullable(rangeEnd),
+        InstantRange.RangeType.CLOSE_CLOSE, earliestInstantTimeFunc);
+  }
+
+  /**
+   * Queries the instant start time with given completion time range.
+   *
+   * @param readTimeline            The read timeline.
+   * @param rangeStart              The query range start completion time.
+   * @param rangeEnd                The query range end completion time.
+   * @param rangeType               The range type.
+   * @param earliestInstantTimeFunc The function to generate the earliest 
start time boundary
+   *                                with the minimum completion time.
+   *
+   * @return The sorted instant time list.
+   */
+  public List<String> getStartTimes(

Review Comment:
   This is confusing with `startTime` and `endTime` in 
`IncrementalQueryAnalyzer`. Let's assume each instant has start timestamp and 
completion timestamp. Then, we can rename this method to 
`getInstantStartTimestamps`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to