This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 225b03c5af8 [HUDI-7184] Add IncrementalQueryAnalyzer for completion 
time based in… (#10255)
225b03c5af8 is described below

commit 225b03c5af82a50c2f3e5760f3e552cd2594fc0b
Author: Danny Chan <[email protected]>
AuthorDate: Tue Feb 13 05:36:34 2024 +0800

    [HUDI-7184] Add IncrementalQueryAnalyzer for completion time based in… 
(#10255)
    
    * [HUDI-7184] Add IncrementalQueryAnalyzer for completion time based 
incremental queries
    
    * CR Feedback
    
    * More CR Feedback
    
    ---------
    
    Co-authored-by: vinoth chandar <[email protected]>
---
 .../hudi/table/action/compact/CompactHelpers.java  |   2 +-
 .../timeline/TestCompletionTimeQueryView.java      |   2 +-
 .../apache/hudi/common/table/log/InstantRange.java | 157 +++++---
 .../table/read/IncrementalQueryAnalyzer.java       | 435 +++++++++++++++++++++
 .../table/timeline/CompletionTimeQueryView.java    | 174 ++++++---
 .../table/timeline/HoodieInstantTimeGenerator.java |   4 +
 .../apache/hudi/common/util/ClusteringUtils.java   |  15 +
 .../metadata/HoodieMetadataLogRecordReader.java    |   2 +-
 .../apache/hudi/configuration/OptionsResolver.java |  14 +-
 .../apache/hudi/source/IncrementalInputSplits.java | 326 +++------------
 .../hudi/source/StreamReadMonitoringFunction.java  |   2 +-
 .../java/org/apache/hudi/util/ClusteringUtil.java  |  20 +-
 .../java/org/apache/hudi/util/StreamerUtil.java    |   3 +-
 .../hudi/source/TestIncrementalInputSplits.java    |  99 +++--
 .../source/TestStreamReadMonitoringFunction.java   |   4 +-
 .../apache/hudi/table/format/TestInputFormat.java  |  67 ++--
 .../test/java/org/apache/hudi/utils/TestUtils.java |   6 +-
 .../java/org/apache/hudi/common/util/Option.java   |   4 +
 .../scala/org/apache/hudi/cdc/CDCRelation.scala    |   2 +-
 .../benchmark/LSMTimelineReadBenchmark.scala       |   2 +-
 20 files changed, 860 insertions(+), 480 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactHelpers.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactHelpers.java
index 78ea56fd9ad..c4ef6444d2d 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactHelpers.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactHelpers.java
@@ -117,7 +117,7 @@ public class CompactHelpers<T, I, K, O> {
         .build();
     Set<String> validInstants = 
HoodieTableMetadataUtil.getValidInstantTimestamps(dataMetaClient, 
metadataMetaClient);
     return InstantRange.builder()
-        .rangeType(InstantRange.RangeType.EXPLICIT_MATCH)
+        .rangeType(InstantRange.RangeType.EXACT_MATCH)
         .explicitInstants(validInstants).build();
   }
 }
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/timeline/TestCompletionTimeQueryView.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/timeline/TestCompletionTimeQueryView.java
index 9dbc9c8656f..3122473565e 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/timeline/TestCompletionTimeQueryView.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/timeline/TestCompletionTimeQueryView.java
@@ -114,7 +114,7 @@ public class TestCompletionTimeQueryView {
   }
 
   private String getInstantTimeSetFormattedString(CompletionTimeQueryView 
view, int completionTime1, int completionTime2) {
-    return view.getStartTimeSet(String.format("%08d", completionTime1), 
String.format("%08d", completionTime2), s -> String.format("%08d", 
Integer.parseInt(s) - 1000))
+    return view.getStartTimes(String.format("%08d", completionTime1), 
String.format("%08d", completionTime2), s -> String.format("%08d", 
Integer.parseInt(s) - 1000))
         .stream().sorted().collect(Collectors.joining(","));
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/InstantRange.java 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/InstantRange.java
index 96c7b0c0ddf..6dfc09dbf4f 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/InstantRange.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/InstantRange.java
@@ -19,25 +19,29 @@
 package org.apache.hudi.common.table.log;
 
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
 
 import java.io.Serializable;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.List;
 import java.util.Objects;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 /**
- * A instant commits range used for incremental reader filtering.
+ * An instant range used for incremental reader filtering.
  */
 public abstract class InstantRange implements Serializable {
   private static final long serialVersionUID = 1L;
 
-  protected final String startInstant;
-  protected final String endInstant;
+  protected final Option<String> startInstant;
+  protected final Option<String> endInstant;
 
   public InstantRange(String startInstant, String endInstant) {
-    this.startInstant = startInstant;
-    this.endInstant = endInstant;
+    this.startInstant = Option.ofNullable(startInstant);
+    this.endInstant = Option.ofNullable(endInstant);
   }
 
   /**
@@ -47,11 +51,11 @@ public abstract class InstantRange implements Serializable {
     return new Builder();
   }
 
-  public String getStartInstant() {
+  public Option<String> getStartInstant() {
     return startInstant;
   }
 
-  public String getEndInstant() {
+  public Option<String> getEndInstant() {
     return endInstant;
   }
 
@@ -60,8 +64,8 @@ public abstract class InstantRange implements Serializable {
   @Override
   public String toString() {
     return "InstantRange{"
-        + "startInstant='" + startInstant == null ? "null" : startInstant + 
'\''
-        + ", endInstant='" + endInstant == null ? "null" : endInstant + '\''
+        + "startInstant='" + (startInstant.isEmpty() ? "-INF" : 
startInstant.get()) + '\''
+        + ", endInstant='" + (endInstant.isEmpty() ? "+INF" : 
endInstant.get()) + '\''
         + ", rangeType='" + this.getClass().getSimpleName() + '\''
         + '}';
   }
@@ -73,89 +77,98 @@ public abstract class InstantRange implements Serializable {
   /**
    * Represents a range type.
    */
-  public static enum RangeType {
-    OPEN_CLOSE, CLOSE_CLOSE, EXPLICIT_MATCH
+  public enum RangeType {
+    /** Start instant is not included (>) and end instant is included (<=). */
+    OPEN_CLOSED,
+    /** Both start and end instants are included (>=, <=). */
+    CLOSED_CLOSED,
+    /** Exact match of instants. */
+    EXACT_MATCH,
+    /** Composition of multiple ranges. */
+    COMPOSITION
   }
 
-  private static class OpenCloseRange extends InstantRange {
+  private static class OpenClosedRange extends InstantRange {
 
-    public OpenCloseRange(String startInstant, String endInstant) {
+    public OpenClosedRange(String startInstant, String endInstant) {
       super(Objects.requireNonNull(startInstant), endInstant);
     }
 
     @Override
     public boolean isInRange(String instant) {
-      // No need to do comparison:
-      // HoodieTimeline.compareTimestamps(instant, 
HoodieTimeline.LESSER_THAN_OR_EQUALS, endInstant)
-      // because the logic is ensured by the log scanner
-      return HoodieTimeline.compareTimestamps(instant, 
HoodieTimeline.GREATER_THAN, startInstant);
+      boolean validAgainstStart = HoodieTimeline.compareTimestamps(instant, 
HoodieTimeline.GREATER_THAN, startInstant.get());
+      // if there is an end instant, check against it, otherwise assume +INF 
and its always valid.
+      boolean validAgainstEnd = endInstant
+              .map(e -> HoodieTimeline.compareTimestamps(instant, 
HoodieTimeline.LESSER_THAN_OR_EQUALS, e))
+              .orElse(true);
+      return validAgainstStart && validAgainstEnd;
     }
   }
 
-  private static class OpenCloseRangeNullableBoundary extends InstantRange {
+  private static class OpenClosedRangeNullableBoundary extends InstantRange {
 
-    public OpenCloseRangeNullableBoundary(String startInstant, String 
endInstant) {
+    public OpenClosedRangeNullableBoundary(String startInstant, String 
endInstant) {
       super(startInstant, endInstant);
-      ValidationUtils.checkArgument(startInstant != null || endInstant != null,
-          "Start and end instants can not both be null");
+      ValidationUtils.checkArgument(!startInstant.isEmpty() || 
!endInstant.isEmpty(),
+          "At least one of start and end instants should be specified.");
     }
 
     @Override
     public boolean isInRange(String instant) {
-      if (startInstant == null) {
-        return HoodieTimeline.compareTimestamps(instant, 
HoodieTimeline.LESSER_THAN_OR_EQUALS, endInstant);
-      } else if (endInstant == null) {
-        return HoodieTimeline.compareTimestamps(instant, 
HoodieTimeline.GREATER_THAN, startInstant);
-      } else {
-        return HoodieTimeline.compareTimestamps(instant, 
HoodieTimeline.GREATER_THAN, startInstant)
-            && HoodieTimeline.compareTimestamps(instant, 
HoodieTimeline.LESSER_THAN_OR_EQUALS, endInstant);
-      }
+      boolean validAgainstStart = startInstant
+              .map(s -> HoodieTimeline.compareTimestamps(instant, 
HoodieTimeline.GREATER_THAN, s))
+              .orElse(true);
+      boolean validAgainstEnd = endInstant
+              .map(e -> HoodieTimeline.compareTimestamps(instant, 
HoodieTimeline.LESSER_THAN_OR_EQUALS, e))
+              .orElse(true);
+
+      return validAgainstStart && validAgainstEnd;
     }
   }
 
-  private static class CloseCloseRange extends InstantRange {
+  private static class ClosedClosedRange extends InstantRange {
 
-    public CloseCloseRange(String startInstant, String endInstant) {
+    public ClosedClosedRange(String startInstant, String endInstant) {
       super(Objects.requireNonNull(startInstant), endInstant);
     }
 
     @Override
     public boolean isInRange(String instant) {
-      // No need to do comparison:
-      // HoodieTimeline.compareTimestamps(instant, 
HoodieTimeline.LESSER_THAN_OR_EQUALS, endInstant)
-      // because the logic is ensured by the log scanner
-      return HoodieTimeline.compareTimestamps(instant, 
HoodieTimeline.GREATER_THAN_OR_EQUALS, startInstant);
+      boolean validAgainstStart = HoodieTimeline.compareTimestamps(instant, 
HoodieTimeline.GREATER_THAN_OR_EQUALS, startInstant.get());
+      boolean validAgainstEnd = endInstant
+              .map(e -> HoodieTimeline.compareTimestamps(instant, 
HoodieTimeline.LESSER_THAN_OR_EQUALS, e))
+              .orElse(true);
+      return validAgainstStart && validAgainstEnd;
     }
   }
 
-  private static class CloseCloseRangeNullableBoundary extends InstantRange {
+  private static class ClosedClosedRangeNullableBoundary extends InstantRange {
 
-    public CloseCloseRangeNullableBoundary(String startInstant, String 
endInstant) {
+    public ClosedClosedRangeNullableBoundary(String startInstant, String 
endInstant) {
       super(startInstant, endInstant);
-      ValidationUtils.checkArgument(startInstant != null || endInstant != null,
-          "Start and end instants can not both be null");
+      ValidationUtils.checkArgument(!startInstant.isEmpty() || 
!endInstant.isEmpty(),
+          "At least one of start and end instants should be specified.");
     }
 
     @Override
     public boolean isInRange(String instant) {
-      if (startInstant == null) {
-        return HoodieTimeline.compareTimestamps(instant, 
HoodieTimeline.LESSER_THAN_OR_EQUALS, endInstant);
-      } else if (endInstant == null) {
-        return HoodieTimeline.compareTimestamps(instant, 
HoodieTimeline.GREATER_THAN_OR_EQUALS, startInstant);
-      } else {
-        return HoodieTimeline.compareTimestamps(instant, 
HoodieTimeline.GREATER_THAN_OR_EQUALS, startInstant)
-            && HoodieTimeline.compareTimestamps(instant, 
HoodieTimeline.LESSER_THAN_OR_EQUALS, endInstant);
-      }
+      boolean validAgainstStart = startInstant
+              .map(s -> HoodieTimeline.compareTimestamps(instant, 
HoodieTimeline.GREATER_THAN_OR_EQUALS, s))
+              .orElse(true);
+      boolean validAgainstEnd = endInstant
+              .map(e -> HoodieTimeline.compareTimestamps(instant, 
HoodieTimeline.LESSER_THAN_OR_EQUALS, e))
+              .orElse(true);
+      return validAgainstStart && validAgainstEnd;
     }
   }
 
   /**
    * Class to assist in checking if an instant is part of a set of instants.
    */
-  private static class ExplicitMatchRange extends InstantRange {
+  private static class ExactMatchRange extends InstantRange {
     Set<String> instants;
 
-    public ExplicitMatchRange(Set<String> instants) {
+    public ExactMatchRange(Set<String> instants) {
       super(Collections.min(instants), Collections.max(instants));
       this.instants = instants;
     }
@@ -166,6 +179,28 @@ public abstract class InstantRange implements Serializable 
{
     }
   }
 
+  /**
+   * Composition of multiple instant ranges in disjunctive form.
+   */
+  private static class CompositionRange extends InstantRange {
+    List<InstantRange> instantRanges;
+
+    public CompositionRange(List<InstantRange> instantRanges) {
+      super(null, null);
+      this.instantRanges = Objects.requireNonNull(instantRanges, "Instant 
ranges should not be null");
+    }
+
+    @Override
+    public boolean isInRange(String instant) {
+      for (InstantRange range : instantRanges) {
+        if (range.isInRange(instant)) {
+          return true;
+        }
+      }
+      return false;
+    }
+  }
+
   // -------------------------------------------------------------------------
   //  Inner Class
   // -------------------------------------------------------------------------
@@ -179,6 +214,7 @@ public abstract class InstantRange implements Serializable {
     private RangeType rangeType;
     private boolean nullableBoundary = false;
     private Set<String> explicitInstants;
+    private List<InstantRange> instantRanges;
 
     private Builder() {
     }
@@ -208,19 +244,26 @@ public abstract class InstantRange implements 
Serializable {
       return this;
     }
 
+    public Builder instantRanges(InstantRange... instantRanges) {
+      this.instantRanges = 
Arrays.stream(instantRanges).collect(Collectors.toList());
+      return this;
+    }
+
     public InstantRange build() {
       ValidationUtils.checkState(this.rangeType != null, "Range type is 
required");
       switch (rangeType) {
-        case OPEN_CLOSE:
+        case OPEN_CLOSED:
           return nullableBoundary
-              ? new OpenCloseRangeNullableBoundary(startInstant, endInstant)
-              : new OpenCloseRange(startInstant, endInstant);
-        case CLOSE_CLOSE:
+              ? new OpenClosedRangeNullableBoundary(startInstant, endInstant)
+              : new OpenClosedRange(startInstant, endInstant);
+        case CLOSED_CLOSED:
           return nullableBoundary
-              ? new CloseCloseRangeNullableBoundary(startInstant, endInstant)
-              : new CloseCloseRange(startInstant, endInstant);
-        case EXPLICIT_MATCH:
-          return new ExplicitMatchRange(this.explicitInstants);
+              ? new ClosedClosedRangeNullableBoundary(startInstant, endInstant)
+              : new ClosedClosedRange(startInstant, endInstant);
+        case EXACT_MATCH:
+          return new ExactMatchRange(this.explicitInstants);
+        case COMPOSITION:
+          return new CompositionRange(this.instantRanges);
         default:
           throw new AssertionError();
       }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/IncrementalQueryAnalyzer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/IncrementalQueryAnalyzer.java
new file mode 100644
index 00000000000..03596354bb1
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/IncrementalQueryAnalyzer.java
@@ -0,0 +1,435 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.read;
+
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.log.InstantRange;
+import org.apache.hudi.common.table.timeline.CompletionTimeQueryView;
+import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.VisibleForTesting;
+import org.apache.hudi.common.util.collection.Pair;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+/**
+ * Analyzer for incremental queries on the timeline, to filter instants based 
on specified ranges.
+ *
+ * <p>The analyzer is supplied the following information:
+ * <ul>
+ *   <li>The archived instants;</li>
+ *   <li>The active instants;</li>
+ *   <li>The instant filtering predicate, e.g the instant range with a 
"startTime" and "endTime"</li>
+ *   <li>Whether the query starts from the "earliest" available instant;</li>
+ *   <li>Whether the query ends to the "latest" available instant;</li>
+ *   <li>The max completion time used for fs view file slice version 
filtering.</li>
+ * </ul>
+ *
+ * <p><h2>Criteria for different query ranges:</h2>
+ *
+ * <table>
+ *   <tr>
+ *     <th>Query Range</th>
+ *     <th>File selection criteria</th>
+ *     <th>Instant filtering predicate applied to selected files</th>
+ *   </tr>
+ *   <tr>
+ *     <td>[earliest, +INF]</td>
+ *     <td>The latest snapshot files from table metadata</td>
+ *     <td>_</td>
+ *   </tr>
+ *   <tr>
+ *     <td>[earliest, endTime]</td>
+ *     <td>The latest snapshot files from table metadata</td>
+ *     <td>'_hoodie_commit_time' in setA, setA contains the begin instant 
times for actions completed before or on 'endTime'</td>
+ *   </tr>
+ *   <tr>
+ *     <td>[-INF, +INF]</td>
+ *     <td>The latest completed instant metadata</td>
+ *     <td>'_hoodie_commit_time' = i_n, i_n is the latest completed 
instant</td>
+ *   </tr>
+ *   <tr>
+ *     <td>[-INF, endTime]</td>
+ *     <td>I) find the last completed instant i_n before or on 'endTime;
+ *         II) read the latest snapshot from table metadata if i_n is archived 
or the commit metadata if it is still active</td>
+ *     <td>'_hoodie_commit_time' = i_n</td>
+ *   </tr>
+ *   <tr>
+ *     <td>[startTime, +INF]</td>
+ *     <td>i).find the instant set setA, setA is a collection of all the 
instants completed after or on 'startTime';
+ *     ii). read the latest snapshot from table metadata if setA has archived 
instants or the commit metadata if all the instants are still active</td>
+ *     <td>'_hoodie_commit_time' in setA</td>
+ *   </tr>
+ *   <tr>
+ *     <td>[earliest, endTime]</td>
+ *     <td>i).find the instant set setA, setA is a collection of all the 
instants completed in the given time range;
+ *     ii). read the latest snapshot from table metadata if setA has archived 
instants or the commit metadata if all the instants are still active</td>
+ *     <td>'_hoodie_commit_time' in setA</td>
+ *   </tr>
+ * </table>
+ *
+ * <p> A {@code RangeType} is required for analyzing the query so that the 
query range boundary inclusiveness have clear semantics.
+ *
+ * <p>IMPORTANT: the reader may optionally choose to fall back to reading the 
latest snapshot if there are files decoding the commit metadata are already 
cleaned.
+ */
+public class IncrementalQueryAnalyzer {
+  public static final String START_COMMIT_EARLIEST = "earliest";
+
+  private final HoodieTableMetaClient metaClient;
+  private final Option<String> startTime;
+  private final Option<String> endTime;
+  private final InstantRange.RangeType rangeType;
+  private final boolean skipCompaction;
+  private final boolean skipClustering;
+  private final int limit;
+
+  private IncrementalQueryAnalyzer(
+      HoodieTableMetaClient metaClient,
+      String startTime,
+      String endTime,
+      InstantRange.RangeType rangeType,
+      boolean skipCompaction,
+      boolean skipClustering,
+      int limit) {
+    this.metaClient = metaClient;
+    this.startTime = Option.ofNullable(startTime);
+    this.endTime = Option.ofNullable(endTime);
+    this.rangeType = rangeType;
+    this.skipCompaction = skipCompaction;
+    this.skipClustering = skipClustering;
+    this.limit = limit;
+  }
+
+  /**
+   * Returns a builder.
+   */
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  /**
+   * Analyzes the incremental query context with given completion time range.
+   *
+   * @return An incremental query context including the instant time range 
info.
+   */
+  public QueryContext analyze() {
+    try (CompletionTimeQueryView completionTimeQueryView = new 
CompletionTimeQueryView(this.metaClient)) {
+      if (completionTimeQueryView.isEmptyTable()) {
+        // no dataset committed in the table
+        return QueryContext.EMPTY;
+      }
+      HoodieTimeline filteredTimeline = getFilteredTimeline(this.metaClient);
+      List<String> instantTimeList = 
completionTimeQueryView.getStartTimes(filteredTimeline, startTime, endTime, 
rangeType);
+      if (instantTimeList.isEmpty()) {
+        // no instants completed within the give time range, returns early.
+        return QueryContext.EMPTY;
+      }
+      // get hoodie instants
+      Pair<List<String>, List<String>> splitInstantTime = 
splitInstantByActiveness(instantTimeList, completionTimeQueryView);
+      Set<String> instantTimeSet = new HashSet<>(instantTimeList);
+      List<String> archivedInstantTime = splitInstantTime.getLeft();
+      List<String> activeInstantTime = splitInstantTime.getRight();
+      List<HoodieInstant> archivedInstants = new ArrayList<>();
+      List<HoodieInstant> activeInstants = new ArrayList<>();
+      HoodieTimeline archivedReadTimeline = null;
+      if (!activeInstantTime.isEmpty()) {
+        activeInstants = filteredTimeline.getInstantsAsStream().filter(instant 
-> 
instantTimeSet.contains(instant.getTimestamp())).collect(Collectors.toList());
+        if (limit > 0 && limit < activeInstants.size()) {
+          // streaming read speed limit, limits the maximum number of commits 
allowed to read for each run
+          activeInstants = activeInstants.subList(0, limit);
+        }
+      }
+      if (!archivedInstantTime.isEmpty()) {
+        archivedReadTimeline = getArchivedReadTimeline(metaClient, 
archivedInstantTime.get(0));
+        archivedInstants = 
archivedReadTimeline.getInstantsAsStream().filter(instant -> 
instantTimeSet.contains(instant.getTimestamp())).collect(Collectors.toList());
+      }
+      List<String> instants = Stream.concat(archivedInstants.stream(), 
activeInstants.stream()).map(HoodieInstant::getTimestamp).collect(Collectors.toList());
+      if (instants.isEmpty()) {
+        // no instants completed within the give time range, returns early.
+        return QueryContext.EMPTY;
+      }
+      if (startTime.isEmpty() && endTime.isPresent()) {
+        instants = Collections.singletonList(instants.get(instants.size() - 
1));
+      }
+      String lastInstant = instants.get(instants.size() - 1);
+      // null => if starting from earliest, if no start time is specified, 
start from the latest instant like usual streaming read semantics.
+      // if startTime is neither, then use the earliest instant as the start 
instant.
+      String startInstant = 
START_COMMIT_EARLIEST.equalsIgnoreCase(startTime.orElse(null)) ? null : 
startTime.isEmpty() ? lastInstant : instants.get(0);
+      String endInstant = endTime.isEmpty() ? null : lastInstant;
+      return QueryContext.create(startInstant, endInstant, instants, 
archivedInstants, activeInstants, filteredTimeline, archivedReadTimeline);
+    }
+  }
+
+  /**
+   * Splits the given instant time list into a pair of archived instant list 
and active instant list.
+   */
+  private static Pair<List<String>, List<String>> 
splitInstantByActiveness(List<String> instantTimeList, CompletionTimeQueryView 
completionTimeQueryView) {
+    int firstActiveIdx = IntStream.range(0, instantTimeList.size()).filter(i 
-> 
!completionTimeQueryView.isArchived(instantTimeList.get(i))).findFirst().orElse(-1);
+    if (firstActiveIdx == -1) {
+      return Pair.of(instantTimeList, Collections.emptyList());
+    } else if (firstActiveIdx == 0) {
+      return Pair.of(Collections.emptyList(), instantTimeList);
+    } else {
+      return Pair.of(instantTimeList.subList(0, firstActiveIdx), 
instantTimeList.subList(firstActiveIdx, instantTimeList.size()));
+    }
+  }
+
+  private HoodieTimeline getFilteredTimeline(HoodieTableMetaClient metaClient) 
{
+    HoodieTimeline timeline = 
metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants();
+    return filterInstantsAsPerUserConfigs(metaClient, timeline, 
this.skipCompaction, this.skipClustering);
+  }
+
+  private HoodieTimeline getArchivedReadTimeline(HoodieTableMetaClient 
metaClient, String startInstant) {
+    HoodieArchivedTimeline archivedTimeline = 
metaClient.getArchivedTimeline(startInstant, false);
+    HoodieTimeline archivedCompleteTimeline = 
archivedTimeline.getCommitsTimeline().filterCompletedInstants();
+    return filterInstantsAsPerUserConfigs(metaClient, 
archivedCompleteTimeline, this.skipCompaction, this.skipClustering);
+  }
+
+  /**
+   * Filters out the unnecessary instants as per user specified configs.
+   *
+   * @param timeline The timeline.
+   *
+   * @return the filtered timeline
+   */
+  @VisibleForTesting
+  public static HoodieTimeline 
filterInstantsAsPerUserConfigs(HoodieTableMetaClient metaClient, HoodieTimeline 
timeline, boolean skipCompaction, boolean skipClustering) {
+    final HoodieTimeline oriTimeline = timeline;
+    if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ & 
skipCompaction) {
+      // the compaction commit uses 'commit' as action which is tricky
+      timeline = timeline.filter(instant -> 
!instant.getAction().equals(HoodieTimeline.COMMIT_ACTION));
+    }
+    if (skipClustering) {
+      timeline = timeline.filter(instant -> 
!ClusteringUtils.isClusteringInstant(instant, oriTimeline));
+    }
+    return timeline;
+  }
+
+  // -------------------------------------------------------------------------
+  //  Inner Class
+  // -------------------------------------------------------------------------
+  /**
+   * Builder for {@link IncrementalQueryAnalyzer}.
+   */
+  public static class Builder {
+    /**
+     * Start completion time.
+     */
+    private String startTime;
+    /**
+     * End completion time.
+     */
+    private String endTime;
+    private InstantRange.RangeType rangeType;
+    private HoodieTableMetaClient metaClient;
+    private boolean skipCompaction = false;
+    private boolean skipClustering = false;
+    /**
+     * Maximum number of instants to read per run.
+     */
+    private int limit = -1;
+
+    public Builder() {
+    }
+
+    public Builder startTime(String startTime) {
+      this.startTime = startTime;
+      return this;
+    }
+
+    public Builder endTime(String endTime) {
+      this.endTime = endTime;
+      return this;
+    }
+
+    public Builder rangeType(InstantRange.RangeType rangeType) {
+      this.rangeType = rangeType;
+      return this;
+    }
+
+    public Builder metaClient(HoodieTableMetaClient metaClient) {
+      this.metaClient = metaClient;
+      return this;
+    }
+
+    public Builder skipCompaction(boolean skipCompaction) {
+      this.skipCompaction = skipCompaction;
+      return this;
+    }
+
+    public Builder skipClustering(boolean skipClustering) {
+      this.skipClustering = skipClustering;
+      return this;
+    }
+
+    public Builder limit(int limit) {
+      this.limit = limit;
+      return this;
+    }
+
+    public IncrementalQueryAnalyzer build() {
+      return new 
IncrementalQueryAnalyzer(Objects.requireNonNull(this.metaClient), 
this.startTime, this.endTime,
+          Objects.requireNonNull(this.rangeType), this.skipCompaction, 
this.skipClustering, this.limit);
+    }
+  }
+
+  /**
+   * Represents the analyzed query context.
+   */
+  public static class QueryContext {
+    public static final QueryContext EMPTY = new QueryContext(null, null, 
Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), 
null, null);
+
+    /**
+     * An empty option indicates consumption from the earliest instant.
+     */
+    private final Option<String> startInstant;
+    /**
+     * An empty option indicates consumption to the latest instant.
+     */
+    private final Option<String> endInstant;
+    private final List<HoodieInstant> archivedInstants;
+    private final List<HoodieInstant> activeInstants;
+    /**
+     * The active timeline to read filtered by given configurations.
+     */
+    private final HoodieTimeline activeTimeline;
+    /**
+     * The archived timeline to read filtered by given configurations.
+     */
+    private final HoodieTimeline archivedTimeline;
+    private final List<String> instants;
+
+    private QueryContext(
+        @Nullable String startInstant,
+        @Nullable String endInstant,
+        List<String> instants,
+        List<HoodieInstant> archivedInstants,
+        List<HoodieInstant> activeInstants,
+        HoodieTimeline activeTimeline,
+        @Nullable HoodieTimeline archivedTimeline) {
+      this.startInstant = Option.ofNullable(startInstant);
+      this.endInstant = Option.ofNullable(endInstant);
+      this.archivedInstants = archivedInstants;
+      this.activeInstants = activeInstants;
+      this.activeTimeline = activeTimeline;
+      this.archivedTimeline = archivedTimeline;
+      this.instants = instants;
+    }
+
+    public static QueryContext create(
+        @Nullable String startInstant,
+        @Nullable String endInstant,
+        List<String> instants,
+        List<HoodieInstant> archivedInstants,
+        List<HoodieInstant> activeInstants,
+        HoodieTimeline activeTimeline,
+        @Nullable HoodieTimeline archivedTimeline) {
+      return new QueryContext(startInstant, endInstant, instants, 
archivedInstants, activeInstants, activeTimeline, archivedTimeline);
+    }
+
+    public boolean isEmpty() {
+      return this.instants.isEmpty();
+    }
+
+    public Option<String> getStartInstant() {
+      return startInstant;
+    }
+
+    public Option<String> getEndInstant() {
+      return endInstant;
+    }
+
+    /**
+     * Returns the latest instant time which should be included physically in 
reading.
+     */
+    public String getLastInstant() {
+      return this.instants.get(this.instants.size() - 1);
+    }
+
+    public List<HoodieInstant> getArchivedInstants() {
+      return archivedInstants;
+    }
+
+    public List<HoodieInstant> getActiveInstants() {
+      return activeInstants;
+    }
+
+    public boolean isConsumingFromEarliest() {
+      return startInstant.isEmpty();
+    }
+
+    public boolean isConsumingToLatest() {
+      return endInstant.isEmpty();
+    }
+
+    public String getMaxCompletionTime() {
+      if (this.activeInstants.size() > 0) {
+        return 
this.activeInstants.stream().map(HoodieInstant::getCompletionTime).filter(Objects::nonNull).max(String::compareTo).get();
+      } else {
+        // all the query instants are archived, use the latest active instant 
completion time as
+        // the file slice version upper threshold, because very probably these 
files already got cleaned,
+        // use the max completion time of the archived instants could yield 
empty file slices.
+        return 
this.activeTimeline.getInstantsAsStream().map(HoodieInstant::getCompletionTime).filter(Objects::nonNull).max(String::compareTo).get();
+      }
+    }
+
+    public Option<InstantRange> getInstantRange() {
+      if (isConsumingFromEarliest()) {
+        if (isConsumingToLatest()) {
+          // A null instant range indicates no filtering.
+          // short-cut for snapshot read
+          return Option.empty();
+        }
+        return Option.of(InstantRange.builder()
+                .startInstant(startInstant.orElse(null))
+                .endInstant(endInstant.orElse(null))
+                .rangeType(InstantRange.RangeType.CLOSED_CLOSED)
+                .nullableBoundary(true)
+                .build());
+      } else {
+        return Option.of(InstantRange.builder()
+                .rangeType(InstantRange.RangeType.EXACT_MATCH)
+                .explicitInstants(new HashSet<>(instants))
+                .build());
+      }
+    }
+
+    public HoodieTimeline getActiveTimeline() {
+      return this.activeTimeline;
+    }
+
+    public @Nullable HoodieTimeline getArchivedTimeline() {
+      return archivedTimeline;
+    }
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/CompletionTimeQueryView.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/CompletionTimeQueryView.java
index 6bac49b83c9..e14381bd373 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/CompletionTimeQueryView.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/CompletionTimeQueryView.java
@@ -21,22 +21,28 @@ package org.apache.hudi.common.table.timeline;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.log.InstantRange;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.VisibleForTesting;
 
 import org.apache.avro.generic.GenericRecord;
 
 import java.io.Serializable;
 import java.time.Instant;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.Date;
+import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+import static 
org.apache.hudi.common.table.read.IncrementalQueryAnalyzer.START_COMMIT_EARLIEST;
 import static 
org.apache.hudi.common.table.timeline.HoodieArchivedTimeline.COMPLETION_TIME_ARCHIVED_META_FIELD;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN_OR_EQUALS;
 import static org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN_OR_EQUALS;
 
 /**
  * Query view for instant completion time.
@@ -54,13 +60,13 @@ public class CompletionTimeQueryView implements 
AutoCloseable, Serializable {
    * Mapping from instant start time -> completion time.
    * Should be thread-safe data structure.
    */
-  private final Map<String, String> startToCompletionInstantTimeMap;
+  private final ConcurrentMap<String, String> beginToCompletionInstantTimeMap;
 
   /**
    * The cursor instant time to eagerly load from, by default load last N days 
of completed instants.
-   * It is tuned dynamically with lazy loading occurs, assumes an initial 
cursor instant as t10,
-   * a completion query for t5 would trigger a lazy loading with this cursor 
instant been updated as t5.
-   * The sliding of the cursor instant economizes redundant loading from 
different queries.
+   * It can grow dynamically with lazy loading. e.g. assuming an initial 
cursor instant as t10,
+   * a completion query for t5 would trigger lazy loading with this cursor 
instant updated to t5.
+   * This sliding window model amortizes redundant loading from different 
queries.
    */
   private volatile String cursorInstant;
 
@@ -82,12 +88,12 @@ public class CompletionTimeQueryView implements 
AutoCloseable, Serializable {
    * The constructor.
    *
    * @param metaClient   The table meta client.
-   * @param cursorInstant The earliest instant time to eagerly load from, by 
default load last N days of completed instants.
+   * @param eagerLoadInstant The earliest instant time to eagerly load from, 
by default load last N days of completed instants.
    */
-  public CompletionTimeQueryView(HoodieTableMetaClient metaClient, String 
cursorInstant) {
+  public CompletionTimeQueryView(HoodieTableMetaClient metaClient, String 
eagerLoadInstant) {
     this.metaClient = metaClient;
-    this.startToCompletionInstantTimeMap = new ConcurrentHashMap<>();
-    this.cursorInstant = HoodieTimeline.minInstant(cursorInstant, 
metaClient.getActiveTimeline().firstInstant().map(HoodieInstant::getTimestamp).orElse(""));
+    this.beginToCompletionInstantTimeMap = new ConcurrentHashMap<>();
+    this.cursorInstant = HoodieTimeline.minInstant(eagerLoadInstant, 
metaClient.getActiveTimeline().firstInstant().map(HoodieInstant::getTimestamp).orElse(""));
     // Note: use getWriteTimeline() to keep sync with the fs view 
visibleCommitsAndCompactionTimeline, see 
AbstractTableFileSystemView.refreshTimeline.
     this.firstNonSavepointCommit = 
metaClient.getActiveTimeline().getWriteTimeline().getFirstNonSavepointCommit().map(HoodieInstant::getTimestamp).orElse("");
     load();
@@ -96,9 +102,16 @@ public class CompletionTimeQueryView implements 
AutoCloseable, Serializable {
   /**
    * Returns whether the instant is completed.
    */
-  public boolean isCompleted(String instantTime) {
-    return this.startToCompletionInstantTimeMap.containsKey(instantTime)
-        || HoodieTimeline.compareTimestamps(instantTime, LESSER_THAN, 
this.firstNonSavepointCommit);
+  public boolean isCompleted(String beginInstantTime) {
+    // archival does not proceed beyond the first savepoint, so any instant 
before that is completed.
+    return this.beginToCompletionInstantTimeMap.containsKey(beginInstantTime) 
|| isArchived(beginInstantTime);
+  }
+
+  /**
+   * Returns whether the instant is archived.
+   */
+  public boolean isArchived(String instantTime) {
+    return HoodieTimeline.compareTimestamps(instantTime, LESSER_THAN, 
this.firstNonSavepointCommit);
   }
 
   /**
@@ -113,7 +126,7 @@ public class CompletionTimeQueryView implements 
AutoCloseable, Serializable {
   }
 
   /**
-   * Returns whether the give instant time {@code instantTime} is sliced after 
or on the base instant {@code baseInstant}.
+   * Returns whether the given instant time {@code instantTime} is sliced 
after or on the base instant {@code baseInstant}.
    */
   public boolean isSlicedAfterOrOn(String baseInstant, String instantTime) {
     Option<String> completionTimeOpt = getCompletionTime(baseInstant, 
instantTime);
@@ -153,21 +166,21 @@ public class CompletionTimeQueryView implements 
AutoCloseable, Serializable {
   /**
    * Queries the instant completion time with given start time.
    *
-   * @param startTime The start time.
+   * @param beginTime The start time.
    *
    * @return The completion time if the instant finished or empty if it is 
still pending.
    */
-  public Option<String> getCompletionTime(String startTime) {
-    String completionTime = 
this.startToCompletionInstantTimeMap.get(startTime);
+  public Option<String> getCompletionTime(String beginTime) {
+    String completionTime = 
this.beginToCompletionInstantTimeMap.get(beginTime);
     if (completionTime != null) {
       return Option.of(completionTime);
     }
-    if (HoodieTimeline.compareTimestamps(startTime, GREATER_THAN_OR_EQUALS, 
this.cursorInstant)) {
+    if (HoodieTimeline.compareTimestamps(beginTime, GREATER_THAN_OR_EQUALS, 
this.cursorInstant)) {
       // the instant is still pending
       return Option.empty();
     }
-    loadCompletionTimeIncrementally(startTime);
-    return 
Option.ofNullable(this.startToCompletionInstantTimeMap.get(startTime));
+    loadCompletionTimeIncrementally(beginTime);
+    return 
Option.ofNullable(this.beginToCompletionInstantTimeMap.get(beginTime));
   }
 
   /**
@@ -175,42 +188,111 @@ public class CompletionTimeQueryView implements 
AutoCloseable, Serializable {
    *
    * <p>By default, assumes there is at most 1 day time of duration for an 
instant to accelerate the queries.
    *
-   * @param startCompletionTime The start completion time.
-   * @param endCompletionTime   The end completion time.
+   * @param timeline The timeline.
+   * @param rangeStart   The query range start completion time.
+   * @param rangeEnd     The query range end completion time.
+   * @param rangeType    The range type.
    *
-   * @return The instant time set.
+   * @return The sorted instant time list.
    */
-  public Set<String> getStartTimeSet(String startCompletionTime, String 
endCompletionTime) {
+  public List<String> getStartTimes(
+      HoodieTimeline timeline,
+      Option<String> rangeStart,
+      Option<String> rangeEnd,
+      InstantRange.RangeType rangeType) {
     // assumes any instant/transaction lasts at most 1 day to optimize the 
query efficiency.
-    return getStartTimeSet(startCompletionTime, endCompletionTime, s -> 
HoodieInstantTimeGenerator.instantTimeMinusMillis(s, MILLI_SECONDS_IN_ONE_DAY));
+    return getStartTimes(timeline, rangeStart, rangeEnd, rangeType, s -> 
HoodieInstantTimeGenerator.instantTimeMinusMillis(s, MILLI_SECONDS_IN_ONE_DAY));
   }
 
   /**
    * Queries the instant start time with given completion time range.
    *
-   * @param startCompletionTime   The start completion time.
-   * @param endCompletionTime     The end completion time.
-   * @param earliestStartTimeFunc The function to generate the earliest start 
time boundary
-   *                              with the minimum completion time {@code 
startCompletionTime}.
+   * @param rangeStart              The query range start completion time.
+   * @param rangeEnd                The query range end completion time.
+   * @param earliestInstantTimeFunc The function to generate the earliest 
start time boundary
+   *                                with the minimum completion time.
    *
-   * @return The instant time set.
+   * @return The sorted instant time list.
    */
-  public Set<String> getStartTimeSet(String startCompletionTime, String 
endCompletionTime, Function<String, String> earliestStartTimeFunc) {
-    String startInstant = earliestStartTimeFunc.apply(startCompletionTime);
+  @VisibleForTesting
+  public List<String> getStartTimes(
+      String rangeStart,
+      String rangeEnd,
+      Function<String, String> earliestInstantTimeFunc) {
+    return 
getStartTimes(metaClient.getCommitsTimeline().filterCompletedInstants(), 
Option.ofNullable(rangeStart), Option.ofNullable(rangeEnd),
+        InstantRange.RangeType.CLOSED_CLOSED, earliestInstantTimeFunc);
+  }
+
+  /**
+   * Queries the instant start time with given completion time range.
+   *
+   * @param timeline            The timeline.
+   * @param rangeStart              The query range start completion time.
+   * @param rangeEnd                The query range end completion time.
+   * @param rangeType               The range type.
+   * @param earliestInstantTimeFunc The function to generate the earliest 
start time boundary
+   *                                with the minimum completion time.
+   *
+   * @return The sorted instant time list.
+   */
+  public List<String> getStartTimes(
+      HoodieTimeline timeline,
+      Option<String> rangeStart,
+      Option<String> rangeEnd,
+      InstantRange.RangeType rangeType,
+      Function<String, String> earliestInstantTimeFunc) {
+    final boolean startFromEarliest = 
START_COMMIT_EARLIEST.equalsIgnoreCase(rangeStart.orElse(null));
+    String earliestInstantToLoad = null;
+    if (rangeStart.isPresent() && !startFromEarliest) {
+      earliestInstantToLoad = earliestInstantTimeFunc.apply(rangeStart.get());
+    } else if (rangeEnd.isPresent()) {
+      earliestInstantToLoad = earliestInstantTimeFunc.apply(rangeEnd.get());
+    }
+
+    // ensure the earliest instant boundary be loaded.
+    if (earliestInstantToLoad != null && 
HoodieTimeline.compareTimestamps(this.cursorInstant, GREATER_THAN, 
earliestInstantToLoad)) {
+      loadCompletionTimeIncrementally(earliestInstantToLoad);
+    }
+
+    if (rangeStart.isEmpty() && rangeEnd.isPresent()) {
+      // returns the last instant that finished at or before the given 
completion time 'endTime'.
+      String maxInstantTime = timeline.getInstantsAsStream()
+          .filter(instant -> instant.isCompleted() && 
HoodieTimeline.compareTimestamps(instant.getCompletionTime(), 
LESSER_THAN_OR_EQUALS, rangeEnd.get()))
+          
.max(Comparator.comparing(HoodieInstant::getCompletionTime)).map(HoodieInstant::getTimestamp).orElse(null);
+      if (maxInstantTime != null) {
+        return Collections.singletonList(maxInstantTime);
+      }
+      // fallback to archived timeline
+      return this.beginToCompletionInstantTimeMap.entrySet().stream()
+          .filter(entry -> HoodieTimeline.compareTimestamps(entry.getValue(), 
LESSER_THAN_OR_EQUALS, rangeEnd.get()))
+          .map(Map.Entry::getKey).collect(Collectors.toList());
+    }
+
+    if (startFromEarliest) {
+      // expedience for snapshot read: ['earliest', _) to avoid loading 
unnecessary instants.
+      rangeStart = Option.empty();
+    }
+
+    if (rangeStart.isEmpty() && rangeEnd.isEmpty()) {
+      // (_, _): read the latest snapshot.
+      return timeline.filterCompletedInstants().lastInstant().map(instant -> 
Collections.singletonList(instant.getTimestamp())).orElse(Collections.emptyList());
+    }
+
     final InstantRange instantRange = InstantRange.builder()
-        .rangeType(InstantRange.RangeType.CLOSE_CLOSE)
-        .startInstant(startCompletionTime)
-        .endInstant(endCompletionTime)
+        .rangeType(rangeType)
+        .startInstant(rangeStart.orElse(null))
+        .endInstant(rangeEnd.orElse(null))
         .nullableBoundary(true)
         .build();
-    if (HoodieTimeline.compareTimestamps(this.cursorInstant, GREATER_THAN, 
startInstant)) {
-      loadCompletionTimeIncrementally(startInstant);
-    }
-    return this.startToCompletionInstantTimeMap.entrySet().stream()
+    return this.beginToCompletionInstantTimeMap.entrySet().stream()
         .filter(entry -> instantRange.isInRange(entry.getValue()))
-        .map(Map.Entry::getKey).collect(Collectors.toSet());
+        .map(Map.Entry::getKey).sorted().collect(Collectors.toList());
   }
 
+  // -------------------------------------------------------------------------
+  //  Utilities
+  // -------------------------------------------------------------------------
+
   private void loadCompletionTimeIncrementally(String startTime) {
     // the 'startTime' should be out of the eager loading range, switch to a 
lazy loading.
     // This operation is resource costly.
@@ -250,20 +332,24 @@ public class CompletionTimeQueryView implements 
AutoCloseable, Serializable {
     setCompletionTime(instantTime, completionTime);
   }
 
-  private void setCompletionTime(String instantTime, String completionTime) {
+  private void setCompletionTime(String beginInstantTime, String 
completionTime) {
     if (completionTime == null) {
       // the meta-server instant does not have completion time
-      completionTime = instantTime;
+      completionTime = beginInstantTime;
     }
-    this.startToCompletionInstantTimeMap.putIfAbsent(instantTime, 
completionTime);
+    this.beginToCompletionInstantTimeMap.putIfAbsent(beginInstantTime, 
completionTime);
   }
 
   public String getCursorInstant() {
     return cursorInstant;
   }
 
+  public boolean isEmptyTable() {
+    return this.beginToCompletionInstantTimeMap.isEmpty();
+  }
+
   @Override
-  public void close() throws Exception {
-    this.startToCompletionInstantTimeMap.clear();
+  public void close() {
+    this.beginToCompletionInstantTimeMap.clear();
   }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java
index 2e48e40820d..a4f4b2cdf24 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java
@@ -108,6 +108,10 @@ public class HoodieInstantTimeGenerator {
   public static String instantTimeMinusMillis(String timestamp, long 
milliseconds) {
     try {
       String timestampInMillis = fixInstantTimeCompatibility(timestamp);
+      // To work with tests, that generate arbitrary timestamps, we need to 
pad the timestamp with 0s.
+      if (timestampInMillis.length() < MILLIS_INSTANT_TIMESTAMP_FORMAT_LENGTH) 
{
+        return String.format("%0" + timestampInMillis.length() + "d", 0);
+      }
       LocalDateTime dt = LocalDateTime.parse(timestampInMillis, 
MILLIS_INSTANT_TIME_FORMATTER);
       ZoneId zoneId = HoodieTimelineTimeZone.UTC.equals(commitTimeZone) ? 
ZoneId.of("UTC") : ZoneId.systemDefault();
       return 
MILLIS_INSTANT_TIME_FORMATTER.format(dt.atZone(zoneId).toInstant().minusMillis(milliseconds).atZone(zoneId).toLocalDateTime());
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
index b7ed15a07af..e9ae9cfbdfb 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
@@ -35,6 +35,7 @@ import 
org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.table.timeline.TimelineUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
@@ -298,4 +299,18 @@ public class ClusteringUtils {
     }
     return oldestInstantToRetain;
   }
+
+  /**
+   * Returns whether the given instant {@code instant} is with clustering 
operation.
+   */
+  public static boolean isClusteringInstant(HoodieInstant instant, 
HoodieTimeline timeline) {
+    if (!instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
+      return false;
+    }
+    try {
+      return TimelineUtils.getCommitMetadata(instant, 
timeline).getOperationType().equals(WriteOperationType.CLUSTER);
+    } catch (IOException e) {
+      throw new HoodieException("Resolve replace commit metadata error for 
instant: " + instant, e);
+    }
+  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataLogRecordReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataLogRecordReader.java
index a4a6da2e29b..49f067b8442 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataLogRecordReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataLogRecordReader.java
@@ -217,7 +217,7 @@ public class HoodieMetadataLogRecordReader implements 
Closeable {
 
     public Builder withLogBlockTimestamps(Set<String> validLogBlockTimestamps) 
{
       InstantRange instantRange = InstantRange.builder()
-          .rangeType(InstantRange.RangeType.EXPLICIT_MATCH)
+          .rangeType(InstantRange.RangeType.EXACT_MATCH)
           .explicitInstants(validLogBlockTimestamps).build();
       scannerBuilder.withInstantRange(Option.of(instantRange));
       return this;
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
index fbe8fcdd62b..ae1a86f36cc 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
@@ -273,10 +273,10 @@ public class OptionsResolver {
   }
 
   /**
-   * Returns whether the read commits limit is specified.
+   * Returns the read commits limit or -1 if not specified.
    */
-  public static boolean hasReadCommitsLimit(Configuration conf) {
-    return conf.contains(FlinkOptions.READ_COMMITS_LIMIT);
+  public static int getReadCommitsLimit(Configuration conf) {
+    return conf.getInteger(FlinkOptions.READ_COMMITS_LIMIT, -1);
   }
 
   /**
@@ -407,4 +407,12 @@ public class OptionsResolver {
     }
     return options;
   }
+
+  /**
+   * Whether the reader only consumes new commit instants.
+   */
+  public static boolean isOnlyConsumingNewCommits(Configuration conf) {
+    return isMorTable(conf) && 
conf.getBoolean(FlinkOptions.READ_STREAMING_SKIP_COMPACT) // this is only true 
for flink.
+        || isAppendMode(conf) && 
conf.getBoolean(FlinkOptions.READ_STREAMING_SKIP_CLUSTERING);
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
index 9586e9ce96c..c1cd5874d96 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
@@ -28,7 +28,7 @@ import org.apache.hudi.common.table.cdc.HoodieCDCExtractor;
 import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit;
 import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
 import org.apache.hudi.common.table.log.InstantRange;
-import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
+import org.apache.hudi.common.table.read.IncrementalQueryAnalyzer;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
@@ -40,10 +40,7 @@ import 
org.apache.hudi.sink.partitioner.profile.WriteProfiles;
 import org.apache.hudi.source.prune.PartitionPruners;
 import org.apache.hudi.table.format.cdc.CdcInputSplit;
 import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
-import org.apache.hudi.util.ClusteringUtil;
-import org.apache.hudi.util.StreamerUtil;
 
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.types.logical.RowType;
@@ -67,11 +64,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
-import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN_OR_EQUALS;
-import static org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN;
-import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN_OR_EQUALS;
-
 /**
  * Utilities to generate incremental input splits {@link 
MergeOnReadInputSplit}.
  * The input splits are used for streaming and incremental read.
@@ -135,44 +127,37 @@ public class IncrementalInputSplits implements 
Serializable {
   public Result inputSplits(
       HoodieTableMetaClient metaClient,
       boolean cdcEnabled) {
-    HoodieTimeline commitTimeline = getReadTimeline(metaClient);
-    if (commitTimeline.empty()) {
-      LOG.warn("No splits found for the table under path " + path);
+
+    IncrementalQueryAnalyzer analyzer = IncrementalQueryAnalyzer.builder()
+        .metaClient(metaClient)
+        .startTime(this.conf.getString(FlinkOptions.READ_START_COMMIT))
+        .endTime(this.conf.getString(FlinkOptions.READ_END_COMMIT))
+        .rangeType(InstantRange.RangeType.CLOSED_CLOSED)
+        .skipCompaction(skipCompaction)
+        .skipClustering(skipClustering)
+        .build();
+
+    IncrementalQueryAnalyzer.QueryContext analyzingResult = analyzer.analyze();
+
+    if (analyzingResult.isEmpty()) {
+      LOG.info("No new instant found for the table under path " + path + ", 
skip reading");
       return Result.EMPTY;
     }
-
-    final String startCommit = 
this.conf.getString(FlinkOptions.READ_START_COMMIT);
-    final String endCommit = this.conf.getString(FlinkOptions.READ_END_COMMIT);
-    final boolean startFromEarliest = 
FlinkOptions.START_COMMIT_EARLIEST.equalsIgnoreCase(startCommit);
-    final boolean startOutOfRange = startCommit != null && 
commitTimeline.isBeforeTimelineStarts(startCommit);
-    final boolean endOutOfRange = endCommit != null && 
commitTimeline.isBeforeTimelineStarts(endCommit);
+    final HoodieTimeline commitTimeline = analyzingResult.getActiveTimeline();
+    final boolean startFromEarliest = 
analyzingResult.isConsumingFromEarliest();
+    final boolean hasArchivedInstants = 
!analyzingResult.getArchivedInstants().isEmpty();
     // We better add another premise: whether the endCommit is cleaned.
-    boolean fullTableScan = startFromEarliest || startOutOfRange || 
endOutOfRange;
-
-    List<HoodieInstant> instants = filterInstantsWithRange(commitTimeline, 
null);
+    boolean fullTableScan = startFromEarliest || hasArchivedInstants;
 
     // Step1: generates the instant range
     // if the specified end commit is archived, still uses the specified 
timestamp,
     // else uses the latest filtered instant time
     // (would be the latest instant time if the specified end commit is 
greater than the latest instant time)
-    final String rangeEnd = endOutOfRange || instants.isEmpty() ? endCommit : 
instants.get(instants.size() - 1).getTimestamp();
-    // keep the same semantics with streaming read, default start from the 
latest commit
-    final String rangeStart = startFromEarliest ? null : (startCommit == null 
? rangeEnd : startCommit);
-    final InstantRange instantRange;
-    if (!fullTableScan) {
-      instantRange = 
InstantRange.builder().startInstant(rangeStart).endInstant(rangeEnd)
-          
.rangeType(InstantRange.RangeType.CLOSE_CLOSE).nullableBoundary(cdcEnabled).build();
-    } else if (startFromEarliest && endCommit == null) {
-      // short-cut for snapshot read
-      instantRange = null;
-    } else {
-      instantRange = 
InstantRange.builder().startInstant(rangeStart).endInstant(rangeEnd)
-          
.rangeType(InstantRange.RangeType.CLOSE_CLOSE).nullableBoundary(true).build();
-    }
+
+    final InstantRange instantRange = 
analyzingResult.getInstantRange().orElse(null);
+
     // Step2: decides the read end commit
-    final String endInstant = endOutOfRange || endCommit == null
-        ? commitTimeline.lastInstant().get().getTimestamp()
-        : rangeEnd;
+    final String endInstant = analyzingResult.getLastInstant();
 
     // Step3: find out the files to read, tries to read the files from the 
commit metadata first,
     // fallback to full table scan if any of the following conditions matches:
@@ -192,10 +177,6 @@ public class IncrementalInputSplits implements 
Serializable {
       }
       fileStatuses = fileIndex.getFilesInPartitions();
     } else {
-      if (instants.size() == 0) {
-        LOG.info("No new instant found for the table under path " + path + ", 
skip reading");
-        return Result.EMPTY;
-      }
       if (cdcEnabled) {
         // case1: cdc change log enabled
         List<MergeOnReadInputSplit> inputSplits = 
getCdcInputSplits(metaClient, instantRange);
@@ -203,6 +184,7 @@ public class IncrementalInputSplits implements Serializable 
{
       }
       // case2: normal incremental read
       String tableName = conf.getString(FlinkOptions.TABLE_NAME);
+      List<HoodieInstant> instants = analyzingResult.getActiveInstants();
       List<HoodieCommitMetadata> metadataList = instants.stream()
           .map(instant -> WriteProfiles.getCommitMetadata(tableName, path, 
instant, commitTimeline)).collect(Collectors.toList());
       readPartitions = getReadPartitions(metadataList);
@@ -233,7 +215,7 @@ public class IncrementalInputSplits implements Serializable 
{
     }
 
     List<MergeOnReadInputSplit> inputSplits = getInputSplits(metaClient, 
commitTimeline,
-        fileStatuses, readPartitions, endInstant, instantRange, false);
+        fileStatuses, readPartitions, endInstant, 
analyzingResult.getMaxCompletionTime(), instantRange, false);
 
     return Result.instance(inputSplits, endInstant);
   }
@@ -242,7 +224,6 @@ public class IncrementalInputSplits implements Serializable 
{
    * Returns the incremental input splits.
    *
    * @param metaClient    The meta client
-   * @param issuedInstant The last issued instant, only valid in streaming read
    * @param issuedOffset  The last issued offset, only valid in streaming read
    * @param cdcEnabled    Whether cdc is enabled
    *
@@ -250,51 +231,36 @@ public class IncrementalInputSplits implements 
Serializable {
    */
   public Result inputSplits(
       HoodieTableMetaClient metaClient,
-      @Nullable String issuedInstant,
       @Nullable String issuedOffset,
       boolean cdcEnabled) {
     metaClient.reloadActiveTimeline();
-    HoodieTimeline commitTimeline = getReadTimeline(metaClient);
-    if (commitTimeline.empty()) {
-      LOG.warn("No splits found for the table under path " + path);
-      return Result.EMPTY;
-    }
+    IncrementalQueryAnalyzer analyzer = IncrementalQueryAnalyzer.builder()
+        .metaClient(metaClient)
+        .startTime(issuedOffset != null ? issuedOffset : 
this.conf.getString(FlinkOptions.READ_START_COMMIT))
+        .endTime(this.conf.getString(FlinkOptions.READ_END_COMMIT))
+        .rangeType(issuedOffset != null ? InstantRange.RangeType.OPEN_CLOSED : 
InstantRange.RangeType.CLOSED_CLOSED)
+        .skipCompaction(skipCompaction)
+        .skipClustering(skipClustering)
+        .limit(OptionsResolver.getReadCommitsLimit(conf))
+        .build();
 
-    // Assumes a timeline:
-    //    c1.inflight, c2(issued instant), c3, c4
-    // -> c1,          c2(issued instant), c3, c4
-    // c1, c3 and c4 are the candidate instants,
-    // we call c1 a 'hollow' instant which has lower version number but 
greater completion time,
-    // filtering the timeline using just c2 could cause data loss,
-    // check these hollow instants first.
-    Result hollowSplits = getHollowInputSplits(metaClient, 
metaClient.getHadoopConf(), issuedInstant, issuedOffset, commitTimeline, 
cdcEnabled);
-
-    List<HoodieInstant> instants = filterInstantsWithRange(commitTimeline, 
issuedInstant);
-    // streaming read speed limit, limits the maximum number of commits 
allowed to read in each instant check
-    if (OptionsResolver.hasReadCommitsLimit(conf)) {
-      int readLimit = this.conf.getInteger(FlinkOptions.READ_COMMITS_LIMIT);
-      instants = instants.subList(0, Math.min(readLimit, instants.size()));
-    }
+    IncrementalQueryAnalyzer.QueryContext queryContext = analyzer.analyze();
 
-    // get the latest instant that satisfies condition
-    final String endInstant = instants.size() == 0 ? null : 
instants.get(instants.size() - 1).getTimestamp();
-    final InstantRange instantRange;
-    if (endInstant != null) {
-      // when cdc is enabled, returns instant range with nullable boundary
-      // to filter the reading instants on the timeline
-      instantRange = getInstantRange(issuedInstant, endInstant, cdcEnabled);
-    } else if (hollowSplits.isEmpty()) {
+    if (queryContext.isEmpty()) {
       LOG.info("No new instant found for the table under path " + path + ", 
skip reading");
       return Result.EMPTY;
-    } else {
-      return hollowSplits;
     }
 
+    HoodieTimeline commitTimeline = queryContext.getActiveTimeline();
+    // get the latest instant that satisfies condition
+    final String endInstant = queryContext.getLastInstant();
+    final Option<InstantRange> instantRange = queryContext.getInstantRange();
+
     // version number should be monotonically increasing
     // fetch the instant offset by completion time
-    String offsetToIssue = 
instants.stream().map(HoodieInstant::getCompletionTime).max(String::compareTo).orElse(endInstant);
+    String offsetToIssue = queryContext.getMaxCompletionTime();
 
-    if (instantRange == null) {
+    if (instantRange.isEmpty()) {
       // reading from the earliest, scans the partitions and files directly.
       FileIndex fileIndex = getFileIndex();
 
@@ -311,12 +277,12 @@ public class IncrementalInputSplits implements 
Serializable {
       }
 
       List<MergeOnReadInputSplit> inputSplits = getInputSplits(metaClient, 
commitTimeline,
-          fileStatuses, readPartitions, endInstant, null, false);
+          fileStatuses, readPartitions, endInstant, offsetToIssue, null, 
false);
 
       return Result.instance(inputSplits, endInstant, offsetToIssue);
     } else {
-      List<MergeOnReadInputSplit> inputSplits = getIncInputSplits(metaClient, 
metaClient.getHadoopConf(), commitTimeline, instants, instantRange, endInstant, 
cdcEnabled);
-      return Result.instance(mergeList(hollowSplits.getInputSplits(), 
inputSplits), endInstant, offsetToIssue);
+      List<MergeOnReadInputSplit> inputSplits = getIncInputSplits(metaClient, 
metaClient.getHadoopConf(), commitTimeline, queryContext, instantRange.get(), 
endInstant, cdcEnabled);
+      return Result.instance(inputSplits, endInstant, offsetToIssue);
     }
   }
 
@@ -327,7 +293,7 @@ public class IncrementalInputSplits implements Serializable 
{
       HoodieTableMetaClient metaClient,
       org.apache.hadoop.conf.Configuration hadoopConf,
       HoodieTimeline commitTimeline,
-      List<HoodieInstant> instants,
+      IncrementalQueryAnalyzer.QueryContext queryContext,
       InstantRange instantRange,
       String endInstant,
       boolean cdcEnabled) {
@@ -338,9 +304,10 @@ public class IncrementalInputSplits implements 
Serializable {
     }
     // case2: normal streaming read
     String tableName = conf.getString(FlinkOptions.TABLE_NAME);
-    List<HoodieCommitMetadata> activeMetadataList = instants.stream()
+    List<HoodieCommitMetadata> activeMetadataList = 
queryContext.getActiveInstants().stream()
         .map(instant -> WriteProfiles.getCommitMetadata(tableName, path, 
instant, commitTimeline)).collect(Collectors.toList());
-    List<HoodieCommitMetadata> archivedMetadataList = 
getArchivedMetadata(metaClient, instantRange, tableName);
+    List<HoodieCommitMetadata> archivedMetadataList = 
queryContext.getArchivedInstants().stream()
+        .map(instant -> WriteProfiles.getCommitMetadata(tableName, path, 
instant, queryContext.getArchivedTimeline())).collect(Collectors.toList());
     if (archivedMetadataList.size() > 0) {
       LOG.warn("\n"
           + 
"--------------------------------------------------------------------------------\n"
@@ -364,74 +331,7 @@ public class IncrementalInputSplits implements 
Serializable {
     }
 
     return getInputSplits(metaClient, commitTimeline,
-        fileStatuses, readPartitions, endInstant, instantRange, 
skipCompaction);
-  }
-
-  /**
-   * Returns the input splits for 'hollow' instants.
-   */
-  private Result getHollowInputSplits(
-      HoodieTableMetaClient metaClient,
-      org.apache.hadoop.conf.Configuration hadoopConf,
-      @Nullable String issuedInstant,
-      @Nullable String issuedOffset,
-      HoodieTimeline commitTimeline,
-      boolean cdcEnabled) {
-    if (issuedInstant == null || issuedOffset == null) {
-      return Result.EMPTY;
-    }
-    // find the write commit instant that finishes later than the issued 
instant
-    // while with smaller txn start time.
-    List<HoodieInstant> instants = commitTimeline.getInstantsAsStream()
-        .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), 
LESSER_THAN, issuedInstant))
-        .filter(s -> HoodieTimeline.compareTimestamps(s.getCompletionTime(), 
GREATER_THAN, issuedOffset))
-        .filter(s -> StreamerUtil.isWriteCommit(metaClient.getTableType(), s, 
commitTimeline)).collect(Collectors.toList());
-    if (instants.isEmpty()) {
-      return Result.EMPTY;
-    }
-    String offsetToIssue = 
instants.stream().map(HoodieInstant::getCompletionTime).max(String::compareTo).orElse(issuedOffset);
-    List<MergeOnReadInputSplit> inputSplits = instants.stream().map(instant -> 
{
-      String instantTs = instant.getTimestamp();
-
-      // Assumes we consume from timeline:
-      //    c0, c1.inflight, c2(issued instant), c3, c4
-      // -> c0, c1,          c2(issued instant), c3, c4
-      // c1, c3 and c4 are the candidate instants,
-
-      // c4 data file could include overlapping records from c2,
-      // use (c2, c4] instant range for c3 and c4,
-
-      // c1 data file could include overlapping records from c0,
-      // use the [c1, c1] instant range for c1.
-      InstantRange instantRange = InstantRange.builder()
-          .startInstant(instantTs)
-          .endInstant(instantTs)
-          .nullableBoundary(cdcEnabled)
-          .rangeType(InstantRange.RangeType.CLOSE_CLOSE).build();
-      return getIncInputSplits(metaClient, hadoopConf, commitTimeline, 
Collections.singletonList(instant), instantRange, instantTs, cdcEnabled);
-    }).flatMap(Collection::stream).collect(Collectors.toList());
-    return Result.instance(inputSplits, issuedInstant, offsetToIssue);
-  }
-
-  @Nullable
-  private InstantRange getInstantRange(String issuedInstant, String 
instantToIssue, boolean nullableBoundary) {
-    if (issuedInstant != null) {
-      // the streaming reader may record the last issued instant, if the 
issued instant is present,
-      // the instant range should be: (issued instant, the latest instant].
-      return 
InstantRange.builder().startInstant(issuedInstant).endInstant(instantToIssue)
-          
.nullableBoundary(nullableBoundary).rangeType(InstantRange.RangeType.OPEN_CLOSE).build();
-    } else if 
(this.conf.getOptional(FlinkOptions.READ_START_COMMIT).isPresent()) {
-      // first time consume and has a start commit
-      final String startCommit = 
this.conf.getString(FlinkOptions.READ_START_COMMIT);
-      return startCommit.equalsIgnoreCase(FlinkOptions.START_COMMIT_EARLIEST)
-          ? null
-          : 
InstantRange.builder().startInstant(startCommit).endInstant(instantToIssue)
-          
.nullableBoundary(nullableBoundary).rangeType(InstantRange.RangeType.CLOSE_CLOSE).build();
-    } else {
-      // first time consume and no start commit, consumes the latest 
incremental data set.
-      return 
InstantRange.builder().startInstant(instantToIssue).endInstant(instantToIssue)
-          
.nullableBoundary(nullableBoundary).rangeType(InstantRange.RangeType.CLOSE_CLOSE).build();
-    }
+        fileStatuses, readPartitions, endInstant, 
queryContext.getMaxCompletionTime(), instantRange, skipCompaction);
   }
 
   private List<MergeOnReadInputSplit> getInputSplits(
@@ -440,14 +340,14 @@ public class IncrementalInputSplits implements 
Serializable {
       FileStatus[] fileStatuses,
       Set<String> readPartitions,
       String endInstant,
+      String maxCompletionTime,
       InstantRange instantRange,
       boolean skipBaseFiles) {
     final HoodieTableFileSystemView fsView = new 
HoodieTableFileSystemView(metaClient, commitTimeline, fileStatuses);
-    final String maxQueryInstantTime = getMaxQueryInstantTime(fsView, 
endInstant);
     final AtomicInteger cnt = new AtomicInteger(0);
     final String mergeType = this.conf.getString(FlinkOptions.MERGE_TYPE);
     return readPartitions.stream()
-        .map(relPartitionPath -> getFileSlices(fsView, relPartitionPath, 
maxQueryInstantTime, skipBaseFiles)
+        .map(relPartitionPath -> getFileSlices(fsView, relPartitionPath, 
maxCompletionTime, skipBaseFiles)
             .map(fileSlice -> {
               Option<List<String>> logPaths = 
Option.ofNullable(fileSlice.getLogFiles()
                   .sorted(HoodieLogFile.getLogFileComparator())
@@ -467,29 +367,6 @@ public class IncrementalInputSplits implements 
Serializable {
         .collect(Collectors.toList());
   }
 
-  /**
-   * Returns the upper threshold of query instant time, when NB-CC is enabled, 
imagine the table has actions as follows:
-   *
-   * <pre>
-   *   t1_t5.delta_commit, t2.compaction.inflight, t3_t4.delta_commit
-   * </pre>
-   *
-   * <p>If the user specified query range end instant is t1, when the query is 
executed at t3, the API #getXXXFileSlicesBeforeOrOn
-   * will just ignore delta commit t1_t5 because its end instant t1 is less 
than the latest file slice base instant t2.
-   * We should refactor this out when all the fs view incremental APIs migrate 
to completion time based semantics.
-   *
-   * <p>CAUTION: The query is costly when endInstant is archived.
-   */
-  private String getMaxQueryInstantTime(HoodieTableFileSystemView fsView, 
String endInstant) {
-    if (OptionsResolver.isMorTable(conf)) {
-      Option<String> completionTime = fsView.getCompletionTime(endInstant);
-      if (completionTime.isPresent()) {
-        return completionTime.get();
-      }
-    }
-    return endInstant;
-  }
-
   private List<MergeOnReadInputSplit> getCdcInputSplits(
       HoodieTableMetaClient metaClient,
       InstantRange instantRange) {
@@ -549,103 +426,6 @@ public class IncrementalInputSplits implements 
Serializable {
     return partitions;
   }
 
-  /**
-   * Returns the archived metadata in case the reader consumes untimely or it 
wants
-   * to read from the earliest.
-   *
-   * <p>Note: should improve it with metadata table when the metadata table is 
stable enough.
-   *
-   * @param metaClient     The meta client
-   * @param instantRange   The instant range to filter the timeline instants
-   * @param tableName      The table name
-   * @return the list of archived metadata, or empty if there is no need to 
read the archived timeline
-   */
-  private List<HoodieCommitMetadata> getArchivedMetadata(
-      HoodieTableMetaClient metaClient,
-      InstantRange instantRange,
-      String tableName) {
-    if 
(metaClient.getActiveTimeline().isBeforeTimelineStarts(instantRange.getStartInstant()))
 {
-      // read the archived metadata if the start instant is archived.
-      HoodieTimeline archivedTimeline = getArchivedReadTimeline(metaClient, 
instantRange.getStartInstant());
-      if (!archivedTimeline.empty()) {
-        return archivedTimeline.getInstantsAsStream()
-            .map(instant -> WriteProfiles.getCommitMetadata(tableName, path, 
instant, archivedTimeline)).collect(Collectors.toList());
-      }
-    }
-    return Collections.emptyList();
-  }
-
-  private HoodieTimeline getReadTimeline(HoodieTableMetaClient metaClient) {
-    HoodieTimeline timeline = 
metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants();
-    return filterInstantsAsPerUserConfigs(timeline);
-  }
-
-  private HoodieTimeline getArchivedReadTimeline(HoodieTableMetaClient 
metaClient, String startInstant) {
-    HoodieArchivedTimeline archivedTimeline = 
metaClient.getArchivedTimeline(startInstant, false);
-    HoodieTimeline archivedCompleteTimeline = 
archivedTimeline.getCommitsTimeline().filterCompletedInstants();
-    return filterInstantsAsPerUserConfigs(archivedCompleteTimeline);
-  }
-
-  /**
-   * Returns the instants with a given issuedInstant to start from.
-   *
-   * @param commitTimeline The completed commits timeline
-   * @param issuedInstant  The last issued instant that has already been 
delivered to downstream
-   *
-   * @return the filtered hoodie instants
-   */
-  @VisibleForTesting
-  public List<HoodieInstant> filterInstantsWithRange(
-      HoodieTimeline commitTimeline,
-      @Nullable final String issuedInstant) {
-    HoodieTimeline completedTimeline = 
commitTimeline.filterCompletedInstants();
-    if (issuedInstant != null) {
-      // returns early for streaming mode
-      return completedTimeline
-          .getInstantsAsStream()
-          .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), 
GREATER_THAN, issuedInstant))
-          .collect(Collectors.toList());
-    }
-
-    Stream<HoodieInstant> instantStream = 
completedTimeline.getInstantsAsStream();
-
-    if (OptionsResolver.hasNoSpecificReadCommits(this.conf)) {
-      // by default read from the latest commit
-      return 
completedTimeline.lastInstant().map(Collections::singletonList).orElseGet(Collections::emptyList);
-    }
-
-    if (OptionsResolver.isSpecificStartCommit(this.conf)) {
-      final String startCommit = this.conf.get(FlinkOptions.READ_START_COMMIT);
-      instantStream = instantStream
-          .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), 
GREATER_THAN_OR_EQUALS, startCommit));
-    }
-    if (this.conf.getOptional(FlinkOptions.READ_END_COMMIT).isPresent()) {
-      final String endCommit = this.conf.get(FlinkOptions.READ_END_COMMIT);
-      instantStream = instantStream.filter(s -> 
HoodieTimeline.compareTimestamps(s.getTimestamp(), LESSER_THAN_OR_EQUALS, 
endCommit));
-    }
-    return instantStream.collect(Collectors.toList());
-  }
-
-  /**
-   * Filters out the unnecessary instants as per user specified configs.
-   *
-   * @param timeline The timeline
-   *
-   * @return the filtered timeline
-   */
-  @VisibleForTesting
-  public HoodieTimeline filterInstantsAsPerUserConfigs(HoodieTimeline 
timeline) {
-    final HoodieTimeline oriTimeline = timeline;
-    if (OptionsResolver.isMorTable(this.conf) & this.skipCompaction) {
-      // the compaction commit uses 'commit' as action which is tricky
-      timeline = timeline.filter(instant -> 
!instant.getAction().equals(HoodieTimeline.COMMIT_ACTION));
-    }
-    if (this.skipClustering) {
-      timeline = timeline.filter(instant -> 
!ClusteringUtil.isClusteringInstant(instant, oriTimeline));
-    }
-    return timeline;
-  }
-
   private static <T> List<T> mergeList(List<T> list1, List<T> list2) {
     if (list1.isEmpty()) {
       return list2;
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
index 86e32fe5a0a..fa911cadb0e 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
@@ -212,7 +212,7 @@ public class StreamReadMonitoringFunction
       return;
     }
     IncrementalInputSplits.Result result =
-        incrementalInputSplits.inputSplits(metaClient, this.issuedInstant, 
this.issuedOffset, this.cdcEnabled);
+        incrementalInputSplits.inputSplits(metaClient, this.issuedOffset, 
this.cdcEnabled);
     if (result.isEmpty()) {
       // no new instants, returns early
       return;
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java
index ac81b4e7af4..e7ebf44b7d3 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java
@@ -19,25 +19,21 @@
 package org.apache.hudi.util;
 
 import org.apache.hudi.client.HoodieFlinkWriteClient;
-import org.apache.hudi.common.model.WriteOperationType;
+import 
org.apache.hudi.client.clustering.plan.strategy.FlinkConsistentBucketClusteringPlanStrategy;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.table.timeline.TimelineUtils;
 import org.apache.hudi.common.util.ClusteringUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.OptionsResolver;
-import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieNotSupportedException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.table.HoodieFlinkTable;
-import 
org.apache.hudi.client.clustering.plan.strategy.FlinkConsistentBucketClusteringPlanStrategy;
 
 import org.apache.flink.configuration.Configuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.util.List;
 import java.util.stream.Collectors;
 
@@ -115,18 +111,4 @@ public class ClusteringUtil {
           commitToRollback -> 
writeClient.getTableServiceClient().getPendingRollbackInfo(table.getMetaClient(),
 commitToRollback, false));
     }
   }
-
-  /**
-   * Returns whether the given instant {@code instant} is with clustering 
operation.
-   */
-  public static boolean isClusteringInstant(HoodieInstant instant, 
HoodieTimeline timeline) {
-    if (!instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
-      return false;
-    }
-    try {
-      return TimelineUtils.getCommitMetadata(instant, 
timeline).getOperationType().equals(WriteOperationType.CLUSTER);
-    } catch (IOException e) {
-      throw new HoodieException("Resolve replace commit metadata error for 
instant: " + instant, e);
-    }
-  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index 4632b6ecb6e..064e59cc751 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -33,6 +33,7 @@ import org.apache.hudi.common.table.log.HoodieLogFormat;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.ClusteringUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.StringUtils;
@@ -517,7 +518,7 @@ public class StreamerUtil {
   public static boolean isWriteCommit(HoodieTableType tableType, HoodieInstant 
instant, HoodieTimeline timeline) {
     return tableType == HoodieTableType.MERGE_ON_READ
         ? !instant.getAction().equals(HoodieTimeline.COMMIT_ACTION) // not a 
compaction
-        : !ClusteringUtil.isClusteringInstant(instant, timeline);   // not a 
clustering
+        : !ClusteringUtils.isClusteringInstant(instant, timeline);   // not a 
clustering
   }
 
   /**
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java
index 29a0326ea41..64211608e05 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java
@@ -20,12 +20,16 @@ package org.apache.hudi.source;
 
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.log.InstantRange;
+import org.apache.hudi.common.table.read.IncrementalQueryAnalyzer;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.common.util.CommitUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.PartitionPathEncodeUtils;
@@ -72,67 +76,77 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 public class TestIncrementalInputSplits extends HoodieCommonTestHarness {
 
   @BeforeEach
-  void init() throws IOException {
+  void init() {
     initPath();
-    initMetaClient();
   }
 
   @Test
-  void testFilterInstantsWithRange() {
-    HoodieActiveTimeline timeline = new HoodieActiveTimeline(metaClient, true);
+  void testFilterInstantsWithRange() throws IOException {
     Configuration conf = TestConfigurations.getDefaultConf(basePath);
     conf.set(FlinkOptions.READ_STREAMING_SKIP_CLUSTERING, true);
-    IncrementalInputSplits iis = IncrementalInputSplits.builder()
-        .conf(conf)
-        .path(new Path(basePath))
-        .rowType(TestConfigurations.ROW_TYPE)
-        
.skipClustering(conf.getBoolean(FlinkOptions.READ_STREAMING_SKIP_CLUSTERING))
-        .build();
+    conf.set(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
+    metaClient = HoodieTestUtils.init(basePath, HoodieTableType.MERGE_ON_READ);
 
+    HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
     HoodieInstant commit1 = new HoodieInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "1");
     HoodieInstant commit2 = new HoodieInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "2");
     HoodieInstant commit3 = new HoodieInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "3");
     timeline.createCompleteInstant(commit1);
     timeline.createCompleteInstant(commit2);
     timeline.createCompleteInstant(commit3);
-    timeline = timeline.reload();
+    timeline = metaClient.reloadActiveTimeline();
+
+    Map<String, String> completionTimeMap = 
timeline.filterCompletedInstants().getInstantsAsStream()
+        .collect(Collectors.toMap(HoodieInstant::getTimestamp, 
HoodieInstant::getCompletionTime));
 
+    IncrementalQueryAnalyzer analyzer1 = IncrementalQueryAnalyzer.builder()
+        .metaClient(metaClient)
+        .rangeType(InstantRange.RangeType.OPEN_CLOSED)
+        .startTime(completionTimeMap.get("1"))
+        .skipClustering(true)
+        .build();
     // previous read iteration read till instant time "1", next read iteration 
should return ["2", "3"]
-    List<HoodieInstant> instantRange2 = iis.filterInstantsWithRange(timeline, 
"1");
-    assertEquals(2, instantRange2.size());
-    assertIterableEquals(Arrays.asList(commit2, commit3), instantRange2);
+    List<HoodieInstant> activeInstants1 = 
analyzer1.analyze().getActiveInstants();
+    assertEquals(2, activeInstants1.size());
+    assertIterableEquals(Arrays.asList(commit2, commit3), activeInstants1);
 
     // simulate first iteration cycle with read from the LATEST commit
-    List<HoodieInstant> instantRange1 = iis.filterInstantsWithRange(timeline, 
null);
-    assertEquals(1, instantRange1.size());
-    assertIterableEquals(Collections.singletonList(commit3), instantRange1);
+    IncrementalQueryAnalyzer analyzer2 = IncrementalQueryAnalyzer.builder()
+        .metaClient(metaClient)
+        .rangeType(InstantRange.RangeType.CLOSED_CLOSED)
+        .skipClustering(true)
+        .build();
+    List<HoodieInstant> activeInstants2 = 
analyzer2.analyze().getActiveInstants();
+    assertEquals(1, activeInstants2.size());
+    assertIterableEquals(Collections.singletonList(commit3), activeInstants2);
 
     // specifying a start and end commit
-    conf.set(FlinkOptions.READ_START_COMMIT, "1");
-    conf.set(FlinkOptions.READ_END_COMMIT, "3");
-    List<HoodieInstant> instantRange3 = iis.filterInstantsWithRange(timeline, 
null);
-    assertEquals(3, instantRange3.size());
-    assertIterableEquals(Arrays.asList(commit1, commit2, commit3), 
instantRange3);
+    IncrementalQueryAnalyzer analyzer3 = IncrementalQueryAnalyzer.builder()
+        .metaClient(metaClient)
+        .rangeType(InstantRange.RangeType.CLOSED_CLOSED)
+        .startTime(completionTimeMap.get("1"))
+        .endTime(completionTimeMap.get("3"))
+        .skipClustering(true)
+        .build();
+    List<HoodieInstant> activeInstants3 = 
analyzer3.analyze().getActiveInstants();
+    assertEquals(3, activeInstants3.size());
+    assertIterableEquals(Arrays.asList(commit1, commit2, commit3), 
activeInstants3);
 
     // add an inflight instant which should be excluded
     HoodieInstant commit4 = new HoodieInstant(HoodieInstant.State.INFLIGHT, 
HoodieTimeline.COMPACTION_ACTION, "4");
     timeline.createNewInstant(commit4);
-    timeline = timeline.reload();
+    timeline = metaClient.reloadActiveTimeline();
     assertEquals(4, timeline.getInstants().size());
-    List<HoodieInstant> instantRange4 = iis.filterInstantsWithRange(timeline, 
null);
-    assertEquals(3, instantRange4.size());
+    List<HoodieInstant> activeInstants4 = 
analyzer3.analyze().getActiveInstants();
+    assertEquals(3, activeInstants4.size());
   }
 
   @Test
   void testFilterInstantsByCondition() throws IOException {
-    HoodieActiveTimeline timeline = new HoodieActiveTimeline(metaClient, true);
     Configuration conf = TestConfigurations.getDefaultConf(basePath);
-    IncrementalInputSplits iis = IncrementalInputSplits.builder()
-            .conf(conf)
-            .path(new Path(basePath))
-            .rowType(TestConfigurations.ROW_TYPE)
-            .build();
+    metaClient = HoodieTestUtils.init(basePath, HoodieTableType.MERGE_ON_READ);
 
+    HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
     HoodieInstant commit1 = new HoodieInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "1");
     HoodieInstant commit2 = new HoodieInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "2");
     HoodieInstant commit3 = new HoodieInstant(HoodieInstant.State.REQUESTED, 
HoodieTimeline.REPLACE_COMMIT_ACTION, "3");
@@ -153,15 +167,16 @@ public class TestIncrementalInputSplits extends 
HoodieCommonTestHarness {
     timeline = timeline.reload();
 
     conf.set(FlinkOptions.READ_END_COMMIT, "3");
-    HoodieTimeline resTimeline = iis.filterInstantsAsPerUserConfigs(timeline);
+    HoodieTimeline resTimeline = 
IncrementalQueryAnalyzer.filterInstantsAsPerUserConfigs(metaClient, timeline, 
false, false);
     // will not filter cluster commit by default
     assertEquals(3, resTimeline.getInstants().size());
   }
 
   @Test
   void testInputSplitsSortedByPartition() throws Exception {
-    HoodieActiveTimeline timeline = new HoodieActiveTimeline(metaClient, true);
     Configuration conf = TestConfigurations.getDefaultConf(basePath);
+    metaClient = HoodieTestUtils.init(basePath, HoodieTableType.COPY_ON_WRITE);
+
     // To enable a full table scan
     conf.set(FlinkOptions.READ_START_COMMIT, 
FlinkOptions.START_COMMIT_EARLIEST);
     TestData.writeData(TestData.DATA_SET_INSERT, conf);
@@ -171,7 +186,7 @@ public class TestIncrementalInputSplits extends 
HoodieCommonTestHarness {
         .path(new Path(basePath))
         .rowType(TestConfigurations.ROW_TYPE)
         .build();
-    IncrementalInputSplits.Result result = iis.inputSplits(metaClient, null, 
null, false);
+    IncrementalInputSplits.Result result = iis.inputSplits(metaClient, null, 
false);
     List<String> partitions = getFilteredPartitions(result);
     assertEquals(Arrays.asList("par1", "par2", "par3", "par4", "par5", 
"par6"), partitions);
   }
@@ -183,6 +198,8 @@ public class TestIncrementalInputSplits extends 
HoodieCommonTestHarness {
       List<String> expectedPartitions) throws Exception {
     Configuration conf = TestConfigurations.getDefaultConf(basePath);
     conf.set(FlinkOptions.READ_AS_STREAMING, true);
+    metaClient = HoodieTestUtils.init(basePath, HoodieTableType.COPY_ON_WRITE);
+
     List<RowData> testData = new ArrayList<>();
     
testData.addAll(TestData.DATA_SET_INSERT.stream().collect(Collectors.toList()));
     
testData.addAll(TestData.DATA_SET_INSERT_PARTITION_IS_NULL.stream().collect(Collectors.toList()));
@@ -199,13 +216,14 @@ public class TestIncrementalInputSplits extends 
HoodieCommonTestHarness {
         .rowType(TestConfigurations.ROW_TYPE)
         .partitionPruner(partitionPruner)
         .build();
-    IncrementalInputSplits.Result result = iis.inputSplits(metaClient, null, 
null, false);
+    IncrementalInputSplits.Result result = iis.inputSplits(metaClient, null, 
false);
     List<String> partitions = getFilteredPartitions(result);
     assertEquals(expectedPartitions, partitions);
   }
 
   @Test
   void testInputSplitsWithSpeedLimit() throws Exception {
+    metaClient = HoodieTestUtils.init(basePath, HoodieTableType.COPY_ON_WRITE);
     Configuration conf = TestConfigurations.getDefaultConf(basePath);
     conf.set(FlinkOptions.READ_AS_STREAMING, true);
     conf.set(FlinkOptions.READ_STREAMING_SKIP_CLUSTERING, true);
@@ -226,21 +244,22 @@ public class TestIncrementalInputSplits extends 
HoodieCommonTestHarness {
             .rowType(TestConfigurations.ROW_TYPE)
             .partitionPruner(null)
             .build();
-    IncrementalInputSplits.Result result = iis.inputSplits(metaClient, 
firstInstant.getTimestamp(), firstInstant.getCompletionTime(), false);
+    IncrementalInputSplits.Result result = iis.inputSplits(metaClient, 
firstInstant.getCompletionTime(), false);
 
     String minStartCommit = result.getInputSplits().stream()
-            .map(split -> split.getInstantRange().get().getStartInstant())
+            .map(split -> 
split.getInstantRange().get().getStartInstant().get())
             .min((commit1,commit2) -> 
HoodieTimeline.compareTimestamps(commit1, LESSER_THAN, commit2) ? 1 : 0)
             .orElse(null);
     String maxEndCommit = result.getInputSplits().stream()
-            .map(split -> split.getInstantRange().get().getEndInstant())
+            .map(split -> split.getInstantRange().get().getEndInstant().get())
             .max((commit1,commit2) -> 
HoodieTimeline.compareTimestamps(commit1, GREATER_THAN, commit2) ? 1 : 0)
             .orElse(null);
-    assertEquals(1, intervalBetween2Instants(commitsTimeline, minStartCommit, 
maxEndCommit));
+    assertEquals(0, intervalBetween2Instants(commitsTimeline, minStartCommit, 
maxEndCommit), "Should read 1 instant");
   }
 
   @Test
   void testInputSplitsForSplitLastCommit() throws Exception {
+    metaClient = HoodieTestUtils.init(basePath, HoodieTableType.COPY_ON_WRITE);
     Configuration conf = TestConfigurations.getDefaultConf(basePath);
     conf.set(FlinkOptions.READ_AS_STREAMING, true);
     conf.set(FlinkOptions.READ_START_COMMIT, 
FlinkOptions.START_COMMIT_EARLIEST);
@@ -271,7 +290,7 @@ public class TestIncrementalInputSplits extends 
HoodieCommonTestHarness {
             .rowType(TestConfigurations.ROW_TYPE)
             .partitionPruner(null)
             .build();
-    IncrementalInputSplits.Result result = iis.inputSplits(metaClient, null, 
null, false);
+    IncrementalInputSplits.Result result = iis.inputSplits(metaClient, null, 
false);
     result.getInputSplits().stream().filter(split -> 
fileIdToBaseInstant.containsKey(split.getFileId()))
             .forEach(split -> 
assertEquals(fileIdToBaseInstant.get(split.getFileId()), 
split.getLatestCommit()));
     assertTrue(result.getInputSplits().stream().anyMatch(split -> 
split.getLatestCommit().equals(lastInstant)),
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
index 6edeceae0d8..5602ddaa8e3 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
@@ -415,8 +415,8 @@ public class TestStreamReadMonitoringFunction {
 
   private static boolean isPointInstantRange(InstantRange instantRange, String 
timestamp) {
     return instantRange != null
-        && Objects.equals(timestamp, instantRange.getStartInstant())
-        && Objects.equals(timestamp, instantRange.getEndInstant());
+        && Objects.equals(timestamp, instantRange.getStartInstant().get())
+        && Objects.equals(timestamp, instantRange.getEndInstant().get());
   }
 
   private AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> 
createHarness(
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
index 5186d68c2cb..b8d0ea6f29e 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
@@ -356,7 +356,7 @@ public class TestInputFormat {
         .build();
 
     // default read the latest commit
-    IncrementalInputSplits.Result splits = 
incrementalInputSplits.inputSplits(metaClient, null, null, true);
+    IncrementalInputSplits.Result splits = 
incrementalInputSplits.inputSplits(metaClient, null, true);
 
     List<RowData> result = readData(inputFormat, 
splits.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
 
@@ -398,7 +398,7 @@ public class TestInputFormat {
         .build();
 
     // default read the latest commit
-    IncrementalInputSplits.Result splits = 
incrementalInputSplits.inputSplits(metaClient, null, null, true);
+    IncrementalInputSplits.Result splits = 
incrementalInputSplits.inputSplits(metaClient, null, true);
 
     List<RowData> result = readData(inputFormat, 
splits.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
 
@@ -438,7 +438,7 @@ public class TestInputFormat {
 
     // default read the latest commit
     // the compaction base files are skipped
-    IncrementalInputSplits.Result splits1 = 
incrementalInputSplits.inputSplits(metaClient, null, null, false);
+    IncrementalInputSplits.Result splits1 = 
incrementalInputSplits.inputSplits(metaClient, null, false);
     assertFalse(splits1.isEmpty());
     List<RowData> result1 = readData(inputFormat, 
splits1.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
 
@@ -454,7 +454,7 @@ public class TestInputFormat {
     String secondCommit = 
TestUtils.getNthCompleteInstant(metaClient.getBasePath(), 0, 
HoodieTimeline.COMMIT_ACTION);
     conf.setString(FlinkOptions.READ_START_COMMIT, secondCommit);
 
-    IncrementalInputSplits.Result splits2 = 
incrementalInputSplits.inputSplits(metaClient, null, null, false);
+    IncrementalInputSplits.Result splits2 = 
incrementalInputSplits.inputSplits(metaClient, null, false);
     assertFalse(splits2.isEmpty());
     List<RowData> result2 = readData(inputFormat, 
splits2.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
     String actual2 = TestData.rowDataToString(result2);
@@ -470,7 +470,7 @@ public class TestInputFormat {
     inputFormat = this.tableSource.getInputFormat(true);
 
     // filter out the last commit by partition pruning
-    IncrementalInputSplits.Result splits3 = 
incrementalInputSplits.inputSplits(metaClient, null, null, false);
+    IncrementalInputSplits.Result splits3 = 
incrementalInputSplits.inputSplits(metaClient, null, false);
     assertFalse(splits3.isEmpty());
     List<RowData> result3 = readData(inputFormat, 
splits3.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
     String actual3 = TestData.rowDataToString(result3);
@@ -503,7 +503,7 @@ public class TestInputFormat {
 
     // default read the latest commit
     // the clustering files are skipped
-    IncrementalInputSplits.Result splits1 = 
incrementalInputSplits.inputSplits(metaClient, null, null, false);
+    IncrementalInputSplits.Result splits1 = 
incrementalInputSplits.inputSplits(metaClient, null, false);
     assertFalse(splits1.isEmpty());
     List<RowData> result1 = readData(inputFormat, 
splits1.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
 
@@ -519,7 +519,7 @@ public class TestInputFormat {
     String secondCommit = 
TestUtils.getNthCompleteInstant(metaClient.getBasePath(), 0, 
HoodieTimeline.REPLACE_COMMIT_ACTION);
     conf.setString(FlinkOptions.READ_START_COMMIT, secondCommit);
 
-    IncrementalInputSplits.Result splits2 = 
incrementalInputSplits.inputSplits(metaClient, null, null, false);
+    IncrementalInputSplits.Result splits2 = 
incrementalInputSplits.inputSplits(metaClient, null, false);
     assertFalse(splits2.isEmpty());
     List<RowData> result2 = readData(inputFormat, 
splits2.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
     String actual2 = TestData.rowDataToString(result2);
@@ -536,7 +536,7 @@ public class TestInputFormat {
     inputFormat = this.tableSource.getInputFormat(true);
 
     // filter out the last commit by partition pruning
-    IncrementalInputSplits.Result splits3 = 
incrementalInputSplits.inputSplits(metaClient, null, null, false);
+    IncrementalInputSplits.Result splits3 = 
incrementalInputSplits.inputSplits(metaClient, null, false);
     assertFalse(splits3.isEmpty());
     List<RowData> result3 = readData(inputFormat, 
splits3.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
     String actual3 = TestData.rowDataToString(result3);
@@ -574,7 +574,9 @@ public class TestInputFormat {
     List<HoodieInstant> instants = 
metaClient.reloadActiveTimeline().getCommitsTimeline().filterCompletedInstants().getInstants();
     assertThat(instants.size(), is(2));
 
-    String c4 = instants.get(1).getTimestamp();
+    String c2 = oriInstants.get(1).getTimestamp();
+    String c3 = oriInstants.get(2).getTimestamp();
+    String c4 = oriInstants.get(3).getTimestamp();
 
     InputFormat<RowData, ?> inputFormat = 
this.tableSource.getInputFormat(true);
     assertThat(inputFormat, instanceOf(MergeOnReadInputFormat.class));
@@ -587,7 +589,7 @@ public class TestInputFormat {
 
     // timeline: c1, c2.inflight, c3.inflight, c4
     // default read the latest commit
-    IncrementalInputSplits.Result splits1 = 
incrementalInputSplits.inputSplits(metaClient, null, null, false);
+    IncrementalInputSplits.Result splits1 = 
incrementalInputSplits.inputSplits(metaClient, null, false);
     assertFalse(splits1.isEmpty());
     List<RowData> result1 = readData(inputFormat, 
splits1.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
     TestData.assertRowDataEquals(result1, TestData.dataSetInsert(7, 8));
@@ -595,7 +597,7 @@ public class TestInputFormat {
     // timeline: c1, c2.inflight, c3.inflight, c4
     // -> c1
     conf.setString(FlinkOptions.READ_START_COMMIT, 
FlinkOptions.START_COMMIT_EARLIEST);
-    IncrementalInputSplits.Result splits2 = 
incrementalInputSplits.inputSplits(metaClient, null, null, false);
+    IncrementalInputSplits.Result splits2 = 
incrementalInputSplits.inputSplits(metaClient, null, false);
     assertFalse(splits2.isEmpty());
     List<RowData> result2 = readData(inputFormat, 
splits2.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
     TestData.assertRowDataEquals(result2, TestData.dataSetInsert(1, 2, 7, 8));
@@ -604,13 +606,13 @@ public class TestInputFormat {
     // c4 -> c2
     TestUtils.saveInstantAsComplete(metaClient, oriInstants.get(1), 
metadataList.get(0)); // complete c2
     assertThat(splits2.getEndInstant(), is(c4));
-    IncrementalInputSplits.Result splits3 = 
incrementalInputSplits.inputSplits(metaClient, splits2.getEndInstant(), 
splits2.getOffset(), false);
+    IncrementalInputSplits.Result splits3 = 
incrementalInputSplits.inputSplits(metaClient, splits2.getOffset(), false);
     assertFalse(splits3.isEmpty());
     List<RowData> result3 = readData(inputFormat, 
splits3.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
     TestData.assertRowDataEquals(result3, TestData.dataSetInsert(3, 4));
 
     // test c2 and c4, c2 completion time > c1, so it is not a hollow instant
-    IncrementalInputSplits.Result splits4 = 
incrementalInputSplits.inputSplits(metaClient, 
oriInstants.get(0).getTimestamp(), oriInstants.get(0).getCompletionTime(), 
false);
+    IncrementalInputSplits.Result splits4 = 
incrementalInputSplits.inputSplits(metaClient, 
oriInstants.get(0).getCompletionTime(), false);
     assertFalse(splits4.isEmpty());
     List<RowData> result4 = readData(inputFormat, 
splits4.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
     TestData.assertRowDataEquals(result4, TestData.dataSetInsert(3, 4, 7, 8));
@@ -618,23 +620,23 @@ public class TestInputFormat {
     // timeline: c1, c2, c3, c4
     // c4 -> c3
     TestUtils.saveInstantAsComplete(metaClient, oriInstants.get(2), 
metadataList.get(1)); // complete c3
-    assertThat(splits3.getEndInstant(), is(c4));
-    IncrementalInputSplits.Result splits5 = 
incrementalInputSplits.inputSplits(metaClient, splits3.getEndInstant(), 
splits3.getOffset(), false);
+    assertThat(splits3.getEndInstant(), is(c2));
+    IncrementalInputSplits.Result splits5 = 
incrementalInputSplits.inputSplits(metaClient, splits3.getOffset(), false);
     assertFalse(splits5.isEmpty());
     List<RowData> result5 = readData(inputFormat, 
splits5.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
     TestData.assertRowDataEquals(result5, TestData.dataSetInsert(5, 6));
 
     // c4 ->
-    assertThat(splits5.getEndInstant(), is(c4));
-    IncrementalInputSplits.Result splits6 = 
incrementalInputSplits.inputSplits(metaClient, splits5.getEndInstant(), 
splits5.getOffset(), false);
+    assertThat(splits5.getEndInstant(), is(c3));
+    IncrementalInputSplits.Result splits6 = 
incrementalInputSplits.inputSplits(metaClient, splits5.getOffset(), false);
     assertTrue(splits6.isEmpty());
 
-    // test c2 and c4, c2 is recognized as a hollow instant
+    // test c2, c3 and c4, c2 and c3 is recognized as hollow instants
     // the (version_number, completion_time) pair is not consistent, just for 
test purpose
-    IncrementalInputSplits.Result splits7 = 
incrementalInputSplits.inputSplits(metaClient, 
oriInstants.get(2).getTimestamp(), oriInstants.get(3).getCompletionTime(), 
false);
+    IncrementalInputSplits.Result splits7 = 
incrementalInputSplits.inputSplits(metaClient, 
oriInstants.get(3).getCompletionTime(), false);
     assertFalse(splits7.isEmpty());
     List<RowData> result7 = readData(inputFormat, 
splits7.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
-    TestData.assertRowDataEquals(result7, TestData.dataSetInsert(3, 4, 7, 8));
+    TestData.assertRowDataEquals(result7, TestData.dataSetInsert(3, 4, 5, 6));
   }
 
   @Test
@@ -656,7 +658,7 @@ public class TestInputFormat {
         .build();
 
     // default read the latest commit
-    IncrementalInputSplits.Result splits1 = 
incrementalInputSplits.inputSplits(metaClient, null, null, false);
+    IncrementalInputSplits.Result splits1 = 
incrementalInputSplits.inputSplits(metaClient, null, false);
     assertFalse(splits1.isEmpty());
     List<RowData> result1 = readData(inputFormat, 
splits1.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
 
@@ -671,7 +673,7 @@ public class TestInputFormat {
     String secondCommit = 
TestUtils.getNthCompleteInstant(metaClient.getBasePath(), 1, 
HoodieTimeline.COMMIT_ACTION);
     conf.setString(FlinkOptions.READ_START_COMMIT, secondCommit);
 
-    IncrementalInputSplits.Result splits2 = 
incrementalInputSplits.inputSplits(metaClient, null, null, false);
+    IncrementalInputSplits.Result splits2 = 
incrementalInputSplits.inputSplits(metaClient, null, false);
     assertFalse(splits2.isEmpty());
     List<RowData> result2 = readData(inputFormat, 
splits2.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
     String actual2 = TestData.rowDataToString(result2);
@@ -780,7 +782,7 @@ public class TestInputFormat {
 
     HoodieTableMetaClient metaClient = 
StreamerUtil.createMetaClient(tempFile.getAbsolutePath(), 
HadoopConfigurations.getHadoopConf(conf));
     List<String> commits = 
metaClient.getCommitsTimeline().filterCompletedInstants().getInstantsAsStream()
-        .map(HoodieInstant::getTimestamp).collect(Collectors.toList());
+        .map(HoodieInstant::getCompletionTime).collect(Collectors.toList());
 
     assertThat(commits.size(), is(3));
 
@@ -864,7 +866,7 @@ public class TestInputFormat {
 
     HoodieTableMetaClient metaClient = 
StreamerUtil.createMetaClient(tempFile.getAbsolutePath(), 
HadoopConfigurations.getHadoopConf(conf));
     List<String> commits = 
metaClient.getCommitsTimeline().filterCompletedInstants().getInstantsAsStream()
-        .map(HoodieInstant::getTimestamp).collect(Collectors.toList());
+        .map(HoodieInstant::getCompletionTime).collect(Collectors.toList());
 
     assertThat(commits.size(), is(3));
 
@@ -921,8 +923,9 @@ public class TestInputFormat {
     assertThat(inputFormat5, instanceOf(CdcInputFormat.class));
 
     List<RowData> actual5 = readData(inputFormat5);
-    final List<RowData> expected5 = TestData.dataSetInsert(1, 2);
-    TestData.assertRowDataEquals(actual5, expected5);
+    // even though the start commit "000" is out of range, after instants 
filtering,
+    // it detects that all the instants are still active, so the intermediate 
changes got returned.
+    TestData.assertRowDataEquals(actual5, expected3);
 
     // start and end commit: both are out of range
     conf.setString(FlinkOptions.READ_START_COMMIT, "001");
@@ -1013,12 +1016,12 @@ public class TestInputFormat {
 
     HoodieTableMetaClient metaClient = 
StreamerUtil.createMetaClient(tempFile.getAbsolutePath(), 
HadoopConfigurations.getHadoopConf(conf));
     List<String> commits = 
metaClient.getCommitsTimeline().filterCompletedInstants().getInstantsAsStream()
-        .map(HoodieInstant::getTimestamp).collect(Collectors.toList());
+        .map(HoodieInstant::getCompletionTime).collect(Collectors.toList());
 
     assertThat(commits.size(), is(4));
 
     List<String> archivedCommits = 
metaClient.getArchivedTimeline().getCommitsTimeline().filterCompletedInstants()
-        
.getInstantsAsStream().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
+        
.getInstantsAsStream().map(HoodieInstant::getCompletionTime).collect(Collectors.toList());
 
     assertThat(archivedCommits.size(), is(6));
 
@@ -1137,9 +1140,7 @@ public class TestInputFormat {
     assertThat(firstCommit.get().getAction(), 
is(HoodieTimeline.DELTA_COMMIT_ACTION));
 
     java.nio.file.Path metaFilePath = Paths.get(metaClient.getMetaPath(), 
firstCommit.get().getFileName());
-    TestUtils.amendCompletionTimeToLatest(metaClient, metaFilePath, 
firstCommit.get().getTimestamp());
-
-    String compactionInstant = 
metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().map(HoodieInstant::getTimestamp).get();
+    String newCompletionTime = 
TestUtils.amendCompletionTimeToLatest(metaClient, metaFilePath, 
firstCommit.get().getTimestamp());
 
     InputFormat<RowData, ?> inputFormat = 
this.tableSource.getInputFormat(true);
     assertThat(inputFormat, instanceOf(MergeOnReadInputFormat.class));
@@ -1150,8 +1151,8 @@ public class TestInputFormat {
         .path(FilePathUtils.toFlinkPath(metaClient.getBasePathV2()))
         .skipCompaction(skipCompaction)
         .build();
-    conf.setString(FlinkOptions.READ_END_COMMIT, compactionInstant);
-    IncrementalInputSplits.Result splits2 = 
incrementalInputSplits.inputSplits(metaClient, null, null, false);
+    conf.setString(FlinkOptions.READ_END_COMMIT, newCompletionTime);
+    IncrementalInputSplits.Result splits2 = 
incrementalInputSplits.inputSplits(metaClient, null, false);
     assertFalse(splits2.isEmpty());
     List<RowData> result2 = readData(inputFormat, 
splits2.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
     TestData.assertRowDataEquals(result2, TestData.dataSetInsert(1, 2));
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java
index 611eb889f8d..f4549eb669d 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java
@@ -138,11 +138,13 @@ public class TestUtils {
         serializeCommitMetadata(metadata));
   }
 
-  public static void amendCompletionTimeToLatest(HoodieTableMetaClient 
metaClient, java.nio.file.Path sourcePath, String instantTime) throws 
IOException {
+  public static String amendCompletionTimeToLatest(HoodieTableMetaClient 
metaClient, java.nio.file.Path sourcePath, String instantTime) throws 
IOException {
     String fileExt = sourcePath.getFileName().toString().split("\\.")[1];
-    String newFileName = instantTime + "_" + metaClient.createNewInstantTime() 
+ "." + fileExt;
+    String newCompletionTime = metaClient.createNewInstantTime();
+    String newFileName = instantTime + "_" + newCompletionTime + "." + fileExt;
 
     java.nio.file.Path newFilePath = 
sourcePath.getParent().resolve(newFileName);
     Files.move(sourcePath, newFilePath);
+    return newCompletionTime;
   }
 }
diff --git a/hudi-io/src/main/java/org/apache/hudi/common/util/Option.java 
b/hudi-io/src/main/java/org/apache/hudi/common/util/Option.java
index 957dab28e2c..42fd98bdd01 100644
--- a/hudi-io/src/main/java/org/apache/hudi/common/util/Option.java
+++ b/hudi-io/src/main/java/org/apache/hudi/common/util/Option.java
@@ -84,6 +84,10 @@ public final class Option<T> implements Serializable {
     return null != val;
   }
 
+  public boolean isEmpty() {
+    return null == val;
+  }
+
   public T get() {
     if (null == val) {
       throw new NoSuchElementException("No value present in Option");
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCRelation.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCRelation.scala
index afccd43ca3e..311383a9c32 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCRelation.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCRelation.scala
@@ -77,7 +77,7 @@ class CDCRelation(
         .startInstant(startInstant)
         .endInstant(endInstant)
         .nullableBoundary(true)
-        .rangeType(InstantRange.RangeType.OPEN_CLOSE).build())
+        .rangeType(InstantRange.RangeType.OPEN_CLOSED).build())
 
   override final def needConversion: Boolean = false
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/LSMTimelineReadBenchmark.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/LSMTimelineReadBenchmark.scala
index 57633098b25..87d136029e5 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/LSMTimelineReadBenchmark.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/LSMTimelineReadBenchmark.scala
@@ -94,7 +94,7 @@ object LSMTimelineReadBenchmark extends HoodieBenchmarkBase {
         }
       }
       benchmark.addCase("read start time") { _ =>
-        new CompletionTimeQueryView(metaClient).getStartTimeSet(startTs + 1 + 
1000 + "", startTs + commitsNum + 1000 + "", earliestStartTimeFunc)
+        new CompletionTimeQueryView(metaClient).getStartTimes(startTs + 1 + 
1000 + "", startTs + commitsNum + 1000 + "", earliestStartTimeFunc)
       }
       benchmark.run()
       val totalSize = 
LSMTimeline.latestSnapshotManifest(metaClient).getFiles.asScala

Reply via email to