This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 3354fac  [HUDI-2449] Incremental read for Flink (#3686)
3354fac is described below

commit 3354fac42f9a2c4dbc8ac73ca4749160e9b9459b
Author: Danny Chan <[email protected]>
AuthorDate: Sun Sep 19 09:06:46 2021 +0800

    [HUDI-2449] Incremental read for Flink (#3686)
---
 .../apache/hudi/configuration/FlinkOptions.java    |  14 +-
 .../java/org/apache/hudi/source/FileIndex.java     |  23 +-
 ...ngFunction.java => IncrementalInputSplits.java} | 376 +++++++++------------
 .../hudi/source/StreamReadMonitoringFunction.java  | 211 +-----------
 .../org/apache/hudi/table/HoodieTableFactory.java  |  12 +
 .../org/apache/hudi/table/HoodieTableSource.java   | 213 ++++++------
 .../table/format/mor/MergeOnReadInputFormat.java   |  29 +-
 .../java/org/apache/hudi/source/TestFileIndex.java |  14 +
 .../source/TestStreamReadMonitoringFunction.java   |   4 +-
 .../apache/hudi/source/TestStreamReadOperator.java |   4 -
 .../apache/hudi/table/HoodieDataSourceITCase.java  |  35 +-
 .../apache/hudi/table/TestHoodieTableFactory.java  |  26 ++
 .../apache/hudi/table/TestHoodieTableSource.java   |  10 +-
 .../apache/hudi/table/format/TestInputFormat.java  |  81 ++++-
 .../test/java/org/apache/hudi/utils/TestData.java  |  10 +-
 .../test/java/org/apache/hudi/utils/TestUtils.java |   3 +-
 16 files changed, 480 insertions(+), 585 deletions(-)

diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java 
b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 3a2e615..c736821 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -197,12 +197,18 @@ public class FlinkOptions extends HoodieConfig {
       .withDescription("Check interval for streaming read of SECOND, default 1 
minute");
 
   public static final String START_COMMIT_EARLIEST = "earliest";
-  public static final ConfigOption<String> READ_STREAMING_START_COMMIT = 
ConfigOptions
-      .key("read.streaming.start-commit")
+  public static final ConfigOption<String> READ_START_COMMIT = ConfigOptions
+      .key("read.start-commit")
       .stringType()
       .noDefaultValue()
-      .withDescription("Start commit instant for streaming read, the commit 
time format should be 'yyyyMMddHHmmss', "
-          + "by default reading from the latest instant");
+      .withDescription("Start commit instant for reading, the commit time 
format should be 'yyyyMMddHHmmss', "
+          + "by default reading from the latest instant for streaming read");
+
+  public static final ConfigOption<String> READ_END_COMMIT = ConfigOptions
+      .key("read.end-commit")
+      .stringType()
+      .noDefaultValue()
+      .withDescription("End commit instant for reading, the commit time format 
should be 'yyyyMMddHHmmss'");
 
   // ------------------------------------------------------------------------
   //  Write Options
diff --git a/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java 
b/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java
index be02fc4..f1abf4b 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java
@@ -28,6 +28,8 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 
+import javax.annotation.Nullable;
+
 import java.io.File;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -36,6 +38,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 
 /**
  * A file index which supports listing files efficiently through metadata 
table.
@@ -138,10 +141,28 @@ public class FileIndex {
   }
 
   // -------------------------------------------------------------------------
+  //  Getter/Setter
+  // -------------------------------------------------------------------------
+
+  /**
+   * Sets up explicit partition paths for pruning.
+   */
+  public void setPartitionPaths(@Nullable Set<String> partitionPaths) {
+    if (partitionPaths != null) {
+      this.partitionPaths = new ArrayList<>(partitionPaths);
+    }
+  }
+
+  // -------------------------------------------------------------------------
   //  Utilities
   // -------------------------------------------------------------------------
 
-  private List<String> getOrBuildPartitionPaths() {
+  /**
+   * Returns all the relative partition paths.
+   *
+   * <p>The partition paths are cached once invoked.
+   */
+  public List<String> getOrBuildPartitionPaths() {
     if (this.partitionPaths != null) {
       return this.partitionPaths;
     }
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
 b/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
similarity index 53%
copy from 
hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
copy to 
hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
index c5610d2..7d31920 100644
--- 
a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -28,23 +28,12 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.sink.partitioner.profile.WriteProfiles;
 import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
-import org.apache.hudi.util.StreamerUtil;
 
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.state.FunctionInitializationContext;
-import org.apache.flink.runtime.state.FunctionSnapshotContext;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
-import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.hadoop.fs.FileStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -55,180 +44,101 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
 import java.util.Set;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN_OR_EQUALS;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN_OR_EQUALS;
 
 /**
- * This is the single (non-parallel) monitoring task which takes a {@link 
MergeOnReadInputSplit}
- * , it is responsible for:
+ * Utilities to generate incremental input splits {@link 
MergeOnReadInputSplit}.
+ * The input splits are used for streaming and incremental read.
  *
+ * <p>How to generate the input splits:
  * <ol>
- *     <li>Monitoring a user-provided hoodie table path.</li>
- *     <li>Deciding which files(or split) should be further read and 
processed.</li>
- *     <li>Creating the {@link MergeOnReadInputSplit splits} corresponding to 
those files.</li>
- *     <li>Assigning them to downstream tasks for further processing.</li>
+ *   <li>first fetch all the commit metadata for the incremental instants;</li>
+ *   <li>resolve the incremental commit file paths;</li>
+ *   <li>filter the full file paths by required partitions;</li>
+ *   <li>use the file paths from #step 3 as the back-up of the filesystem 
view.</li>
  * </ol>
- *
- * <p>The splits to be read are forwarded to the downstream {@link 
StreamReadOperator}
- * which can have parallelism greater than one.
- *
- * <p><b>IMPORTANT NOTE: </b> Splits are forwarded downstream for reading in 
ascending instant commits time order,
- * in each downstream task, the splits are also read in receiving sequence. We 
do not ensure split consuming sequence
- * among the downstream tasks.
  */
-public class StreamReadMonitoringFunction
-    extends RichSourceFunction<MergeOnReadInputSplit> implements 
CheckpointedFunction {
-  private static final Logger LOG = 
LoggerFactory.getLogger(StreamReadMonitoringFunction.class);
-
-  private static final long serialVersionUID = 1L;
-
-  /**
-   * The path to monitor.
-   */
-  private final Path path;
-
-  /**
-   * The interval between consecutive path scans.
-   */
-  private final long interval;
-
-  private transient Object checkpointLock;
-
-  private volatile boolean isRunning = true;
-
-  private String issuedInstant;
-
-  private transient ListState<String> instantState;
-
+public class IncrementalInputSplits {
+  private static final Logger LOG = 
LoggerFactory.getLogger(IncrementalInputSplits.class);
   private final Configuration conf;
-
-  private transient org.apache.hadoop.conf.Configuration hadoopConf;
-
-  private HoodieTableMetaClient metaClient;
-
+  private final Path path;
   private final long maxCompactionMemoryInBytes;
-
   // for partition pruning
-  private final Set<String> requiredPartitionPaths;
+  private final Set<String> requiredPartitions;
 
-  public StreamReadMonitoringFunction(
+  private IncrementalInputSplits(
       Configuration conf,
       Path path,
       long maxCompactionMemoryInBytes,
-      Set<String> requiredPartitionPaths) {
+      @Nullable Set<String> requiredPartitions) {
     this.conf = conf;
     this.path = path;
-    this.interval = 
conf.getInteger(FlinkOptions.READ_STREAMING_CHECK_INTERVAL);
     this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
-    this.requiredPartitionPaths = requiredPartitionPaths;
+    this.requiredPartitions = requiredPartitions;
   }
 
-  @Override
-  public void initializeState(FunctionInitializationContext context) throws 
Exception {
-
-    ValidationUtils.checkState(this.instantState == null,
-        "The " + getClass().getSimpleName() + " has already been 
initialized.");
-
-    this.instantState = context.getOperatorStateStore().getListState(
-        new ListStateDescriptor<>(
-            "file-monitoring-state",
-            StringSerializer.INSTANCE
-        )
-    );
-
-    if (context.isRestored()) {
-      LOG.info("Restoring state for the class {} with table {} and base path 
{}.",
-          getClass().getSimpleName(), conf.getString(FlinkOptions.TABLE_NAME), 
path);
-
-      List<String> retrievedStates = new ArrayList<>();
-      for (String entry : this.instantState.get()) {
-        retrievedStates.add(entry);
-      }
-
-      ValidationUtils.checkArgument(retrievedStates.size() <= 1,
-          getClass().getSimpleName() + " retrieved invalid state.");
-
-      if (retrievedStates.size() == 1 && issuedInstant != null) {
-        // this is the case where we have both legacy and new state.
-        // the two should be mutually exclusive for the operator, thus we 
throw the exception.
-
-        throw new IllegalArgumentException(
-            "The " + getClass().getSimpleName() + " has already restored from 
a previous Flink version.");
-
-      } else if (retrievedStates.size() == 1) {
-        this.issuedInstant = retrievedStates.get(0);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("{} retrieved a issued instant of time {} for table {} 
with path {}.",
-              getClass().getSimpleName(), issuedInstant, 
conf.get(FlinkOptions.TABLE_NAME), path);
-        }
-      }
-    }
-  }
-
-  @Override
-  public void open(Configuration parameters) throws Exception {
-    super.open(parameters);
-    this.hadoopConf = StreamerUtil.getHadoopConf();
-  }
-
-  @Override
-  public void run(SourceFunction.SourceContext<MergeOnReadInputSplit> context) 
throws Exception {
-    checkpointLock = context.getCheckpointLock();
-    while (isRunning) {
-      synchronized (checkpointLock) {
-        monitorDirAndForwardSplits(context);
-      }
-      TimeUnit.SECONDS.sleep(interval);
-    }
+  /**
+   * Returns the builder.
+   */
+  public static Builder builder() {
+    return new Builder();
   }
 
-  @Nullable
-  private HoodieTableMetaClient getOrCreateMetaClient() {
-    if (this.metaClient != null) {
-      return this.metaClient;
-    }
-    if (StreamerUtil.tableExists(this.path.toString(), hadoopConf)) {
-      this.metaClient = StreamerUtil.createMetaClient(this.path.toString(), 
hadoopConf);
-      return this.metaClient;
-    }
-    // fallback
-    return null;
+  /**
+   * Returns the incremental input splits.
+   *
+   * @param metaClient The meta client
+   * @param hadoopConf The hadoop configuration
+   * @return The list of incremental input splits or empty if there are no new 
instants
+   */
+  public Result inputSplits(
+      HoodieTableMetaClient metaClient,
+      org.apache.hadoop.conf.Configuration hadoopConf) {
+    return inputSplits(metaClient, hadoopConf, null);
   }
 
-  @VisibleForTesting
-  public void monitorDirAndForwardSplits(SourceContext<MergeOnReadInputSplit> 
context) {
-    HoodieTableMetaClient metaClient = getOrCreateMetaClient();
-    if (metaClient == null) {
-      // table does not exist
-      return;
-    }
+  /**
+   * Returns the incremental input splits.
+   *
+   * @param metaClient    The meta client
+   * @param hadoopConf    The hadoop configuration
+   * @param issuedInstant The last issued instant, only valid in streaming read
+   * @return The list of incremental input splits or empty if there are no new 
instants
+   */
+  public Result inputSplits(
+      HoodieTableMetaClient metaClient,
+      org.apache.hadoop.conf.Configuration hadoopConf,
+      String issuedInstant) {
     metaClient.reloadActiveTimeline();
     HoodieTimeline commitTimeline = 
metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants();
     if (commitTimeline.empty()) {
       LOG.warn("No splits found for the table under path " + path);
-      return;
+      return Result.EMPTY;
     }
-    List<HoodieInstant> instants = filterInstantsWithStart(commitTimeline, 
this.issuedInstant);
+    List<HoodieInstant> instants = filterInstantsWithRange(commitTimeline, 
issuedInstant);
     // get the latest instant that satisfies condition
     final HoodieInstant instantToIssue = instants.size() == 0 ? null : 
instants.get(instants.size() - 1);
     final InstantRange instantRange;
     if (instantToIssue != null) {
-      if (this.issuedInstant != null) {
-        // had already consumed an instant
-        instantRange = InstantRange.getInstance(this.issuedInstant, 
instantToIssue.getTimestamp(),
+      if (issuedInstant != null) {
+        // the streaming reader may record the last issued instant, if the 
issued instant is present,
+        // the instant range should be: (issued instant, the latest instant].
+        instantRange = InstantRange.getInstance(issuedInstant, 
instantToIssue.getTimestamp(),
             InstantRange.RangeType.OPEN_CLOSE);
-      } else if 
(this.conf.getOptional(FlinkOptions.READ_STREAMING_START_COMMIT).isPresent()) {
+      } else if 
(this.conf.getOptional(FlinkOptions.READ_START_COMMIT).isPresent()) {
         // first time consume and has a start commit
-        final String specifiedStart = 
this.conf.getString(FlinkOptions.READ_STREAMING_START_COMMIT);
-        instantRange = 
specifiedStart.equalsIgnoreCase(FlinkOptions.START_COMMIT_EARLIEST)
+        final String startCommit = 
this.conf.getString(FlinkOptions.READ_START_COMMIT);
+        instantRange = 
startCommit.equalsIgnoreCase(FlinkOptions.START_COMMIT_EARLIEST)
             ? null
-            : InstantRange.getInstance(specifiedStart, 
instantToIssue.getTimestamp(), InstantRange.RangeType.CLOSE_CLOSE);
+            : InstantRange.getInstance(startCommit, 
instantToIssue.getTimestamp(), InstantRange.RangeType.CLOSE_CLOSE);
       } else {
         // first time consume and no start commit, consumes the latest 
incremental data set.
         instantRange = InstantRange.getInstance(instantToIssue.getTimestamp(), 
instantToIssue.getTimestamp(),
@@ -236,18 +146,13 @@ public class StreamReadMonitoringFunction
       }
     } else {
       LOG.info("No new instant found for the table under path " + path + ", 
skip reading");
-      return;
+      return Result.EMPTY;
     }
-    // generate input split:
-    // 1. first fetch all the commit metadata for the incremental instants;
-    // 2. filter the relative partition paths
-    // 3. filter the full file paths
-    // 4. use the file paths from #step 3 as the back-up of the filesystem view
 
     String tableName = conf.getString(FlinkOptions.TABLE_NAME);
     List<HoodieCommitMetadata> activeMetadataList = instants.stream()
         .map(instant -> WriteProfiles.getCommitMetadata(tableName, path, 
instant, commitTimeline)).collect(Collectors.toList());
-    List<HoodieCommitMetadata> archivedMetadataList = 
getArchivedMetadata(instantRange, commitTimeline, tableName);
+    List<HoodieCommitMetadata> archivedMetadataList = 
getArchivedMetadata(metaClient, instantRange, commitTimeline, tableName);
     if (archivedMetadataList.size() > 0) {
       LOG.warn(""
           + 
"--------------------------------------------------------------------------------\n"
@@ -261,22 +166,22 @@ public class StreamReadMonitoringFunction
 
     Set<String> writePartitions = getWritePartitionPaths(metadataList);
     // apply partition push down
-    if (this.requiredPartitionPaths.size() > 0) {
+    if (this.requiredPartitions != null) {
       writePartitions = writePartitions.stream()
-          
.filter(this.requiredPartitionPaths::contains).collect(Collectors.toSet());
+          
.filter(this.requiredPartitions::contains).collect(Collectors.toSet());
     }
     FileStatus[] fileStatuses = WriteProfiles.getWritePathsOfInstants(path, 
hadoopConf, metadataList);
     if (fileStatuses.length == 0) {
       LOG.warn("No files found for reading in user provided path.");
-      return;
+      return Result.EMPTY;
     }
 
     HoodieTableFileSystemView fsView = new 
HoodieTableFileSystemView(metaClient, commitTimeline, fileStatuses);
-    final String commitToIssue = instantToIssue.getTimestamp();
+    final String endInstant = instantToIssue.getTimestamp();
     final AtomicInteger cnt = new AtomicInteger(0);
     final String mergeType = this.conf.getString(FlinkOptions.MERGE_TYPE);
     List<MergeOnReadInputSplit> inputSplits = writePartitions.stream()
-        .map(relPartitionPath -> 
fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, commitToIssue)
+        .map(relPartitionPath -> 
fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, endInstant)
             .map(fileSlice -> {
               Option<List<String>> logPaths = 
Option.ofNullable(fileSlice.getLogFiles()
                   .sorted(HoodieLogFile.getLogFileComparator())
@@ -284,64 +189,12 @@ public class StreamReadMonitoringFunction
                   .collect(Collectors.toList()));
               String basePath = 
fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null);
               return new MergeOnReadInputSplit(cnt.getAndAdd(1),
-                  basePath, logPaths, commitToIssue,
+                  basePath, logPaths, endInstant,
                   metaClient.getBasePath(), maxCompactionMemoryInBytes, 
mergeType, instantRange);
             }).collect(Collectors.toList()))
         .flatMap(Collection::stream)
         .collect(Collectors.toList());
-
-    for (MergeOnReadInputSplit split : inputSplits) {
-      context.collect(split);
-    }
-    // update the issues instant time
-    this.issuedInstant = commitToIssue;
-    LOG.info(""
-        + "------------------------------------------------------------\n"
-        + "---------- consumed to instant: {}\n"
-        + "------------------------------------------------------------",
-        commitToIssue);
-  }
-
-  @Override
-  public void close() throws Exception {
-    super.close();
-
-    if (checkpointLock != null) {
-      synchronized (checkpointLock) {
-        issuedInstant = null;
-        isRunning = false;
-      }
-    }
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Closed File Monitoring Source for path: " + path + ".");
-    }
-  }
-
-  @Override
-  public void cancel() {
-    if (checkpointLock != null) {
-      // this is to cover the case where cancel() is called before the run()
-      synchronized (checkpointLock) {
-        issuedInstant = null;
-        isRunning = false;
-      }
-    } else {
-      issuedInstant = null;
-      isRunning = false;
-    }
-  }
-
-  // -------------------------------------------------------------------------
-  //  Checkpointing
-  // -------------------------------------------------------------------------
-
-  @Override
-  public void snapshotState(FunctionSnapshotContext context) throws Exception {
-    this.instantState.clear();
-    if (this.issuedInstant != null) {
-      this.instantState.add(this.issuedInstant);
-    }
+    return Result.instance(inputSplits, endInstant);
   }
 
   /**
@@ -350,12 +203,14 @@ public class StreamReadMonitoringFunction
    *
    * <p>Note: should improve it with metadata table when the metadata table is 
stable enough.
    *
+   * @param metaClient     The meta client
    * @param instantRange   The instant range to filter the timeline instants
    * @param commitTimeline The commit timeline
    * @param tableName      The table name
    * @return the list of archived metadata, or empty if there is no need to 
read the archived timeline
    */
   private List<HoodieCommitMetadata> getArchivedMetadata(
+      HoodieTableMetaClient metaClient,
       InstantRange instantRange,
       HoodieTimeline commitTimeline,
       String tableName) {
@@ -389,23 +244,30 @@ public class StreamReadMonitoringFunction
    * @param issuedInstant  The last issued instant that has already been 
delivered to downstream
    * @return the filtered hoodie instants
    */
-  private List<HoodieInstant> filterInstantsWithStart(
+  private List<HoodieInstant> filterInstantsWithRange(
       HoodieTimeline commitTimeline,
       final String issuedInstant) {
     HoodieTimeline completedTimeline = 
commitTimeline.filterCompletedInstants();
     if (issuedInstant != null) {
+      // returns early for streaming mode
       return completedTimeline.getInstants()
           .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), 
GREATER_THAN, issuedInstant))
           .collect(Collectors.toList());
-    } else if 
(this.conf.getOptional(FlinkOptions.READ_STREAMING_START_COMMIT).isPresent()
-        && 
!this.conf.get(FlinkOptions.READ_STREAMING_START_COMMIT).equalsIgnoreCase(FlinkOptions.START_COMMIT_EARLIEST))
 {
-      String definedStartCommit = 
this.conf.get(FlinkOptions.READ_STREAMING_START_COMMIT);
-      return completedTimeline.getInstants()
-          .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), 
GREATER_THAN_OR_EQUALS, definedStartCommit))
-          .collect(Collectors.toList());
-    } else {
-      return completedTimeline.getInstants().collect(Collectors.toList());
     }
+
+    Stream<HoodieInstant> instantStream = completedTimeline.getInstants();
+
+    if (this.conf.getOptional(FlinkOptions.READ_START_COMMIT).isPresent()
+        && 
!this.conf.get(FlinkOptions.READ_START_COMMIT).equalsIgnoreCase(FlinkOptions.START_COMMIT_EARLIEST))
 {
+      final String startCommit = this.conf.get(FlinkOptions.READ_START_COMMIT);
+      instantStream = instantStream
+          .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), 
GREATER_THAN_OR_EQUALS, startCommit));
+    }
+    if (this.conf.getOptional(FlinkOptions.READ_END_COMMIT).isPresent()) {
+      final String endCommit = this.conf.get(FlinkOptions.READ_END_COMMIT);
+      instantStream = instantStream.filter(s -> 
HoodieTimeline.compareTimestamps(s.getTimestamp(), LESSER_THAN_OR_EQUALS, 
endCommit));
+    }
+    return instantStream.collect(Collectors.toList());
   }
 
   /**
@@ -426,4 +288,78 @@ public class StreamReadMonitoringFunction
     merged.addAll(list2);
     return merged;
   }
+
+  // -------------------------------------------------------------------------
+  //  Inner Class
+  // -------------------------------------------------------------------------
+
+  /**
+   * Represents a result of calling {@link #inputSplits}.
+   */
+  public static class Result {
+    private final List<MergeOnReadInputSplit> inputSplits; // input splits
+    private final String endInstant; // end instant to consume to
+
+    public static final Result EMPTY = instance(Collections.emptyList(), "");
+
+    public boolean isEmpty() {
+      return this.inputSplits.size() == 0;
+    }
+
+    public List<MergeOnReadInputSplit> getInputSplits() {
+      return this.inputSplits;
+    }
+
+    public String getEndInstant() {
+      return this.endInstant;
+    }
+
+    private Result(List<MergeOnReadInputSplit> inputSplits, String endInstant) 
{
+      this.inputSplits = inputSplits;
+      this.endInstant = endInstant;
+    }
+
+    public static Result instance(List<MergeOnReadInputSplit> inputSplits, 
String endInstant) {
+      return new Result(inputSplits, endInstant);
+    }
+  }
+
+  /**
+   * Builder for {@link IncrementalInputSplits}.
+   */
+  public static class Builder {
+    private Configuration conf;
+    private Path path;
+    private long maxCompactionMemoryInBytes;
+    // for partition pruning
+    private Set<String> requiredPartitions;
+
+    public Builder() {
+    }
+
+    public Builder conf(Configuration conf) {
+      this.conf = conf;
+      return this;
+    }
+
+    public Builder path(Path path) {
+      this.path = path;
+      return this;
+    }
+
+    public Builder maxCompactionMemoryInBytes(long maxCompactionMemoryInBytes) 
{
+      this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
+      return this;
+    }
+
+    public Builder requiredPartitions(@Nullable Set<String> 
requiredPartitions) {
+      this.requiredPartitions = requiredPartitions;
+      return this;
+    }
+
+    public IncrementalInputSplits build() {
+      return new IncrementalInputSplits(Objects.requireNonNull(this.conf), 
Objects.requireNonNull(this.path),
+          this.maxCompactionMemoryInBytes, this.requiredPartitions);
+    }
+  }
 }
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
 
b/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
index c5610d2..bfd7452 100644
--- 
a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
@@ -18,19 +18,9 @@
 
 package org.apache.hudi.source;
 
-import org.apache.hudi.common.model.BaseFile;
-import org.apache.hudi.common.model.HoodieCommitMetadata;
-import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.log.InstantRange;
-import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
-import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.configuration.FlinkOptions;
-import org.apache.hudi.sink.partitioner.profile.WriteProfiles;
 import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
 import org.apache.hudi.util.StreamerUtil;
 
@@ -45,24 +35,15 @@ import 
org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.hadoop.fs.FileStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
-import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN_OR_EQUALS;
 
 /**
  * This is the single (non-parallel) monitoring task which takes a {@link 
MergeOnReadInputSplit}
@@ -112,21 +93,21 @@ public class StreamReadMonitoringFunction
 
   private HoodieTableMetaClient metaClient;
 
-  private final long maxCompactionMemoryInBytes;
-
-  // for partition pruning
-  private final Set<String> requiredPartitionPaths;
+  private final IncrementalInputSplits incrementalInputSplits;
 
   public StreamReadMonitoringFunction(
       Configuration conf,
       Path path,
       long maxCompactionMemoryInBytes,
-      Set<String> requiredPartitionPaths) {
+      @Nullable Set<String> requiredPartitionPaths) {
     this.conf = conf;
     this.path = path;
     this.interval = 
conf.getInteger(FlinkOptions.READ_STREAMING_CHECK_INTERVAL);
-    this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
-    this.requiredPartitionPaths = requiredPartitionPaths;
+    this.incrementalInputSplits = IncrementalInputSplits.builder()
+        .conf(conf)
+        .path(path)
+        .maxCompactionMemoryInBytes(maxCompactionMemoryInBytes)
+        .requiredPartitions(requiredPartitionPaths).build();
   }
 
   @Override
@@ -208,98 +189,23 @@ public class StreamReadMonitoringFunction
       // table does not exist
       return;
     }
-    metaClient.reloadActiveTimeline();
-    HoodieTimeline commitTimeline = 
metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants();
-    if (commitTimeline.empty()) {
-      LOG.warn("No splits found for the table under path " + path);
-      return;
-    }
-    List<HoodieInstant> instants = filterInstantsWithStart(commitTimeline, 
this.issuedInstant);
-    // get the latest instant that satisfies condition
-    final HoodieInstant instantToIssue = instants.size() == 0 ? null : 
instants.get(instants.size() - 1);
-    final InstantRange instantRange;
-    if (instantToIssue != null) {
-      if (this.issuedInstant != null) {
-        // had already consumed an instant
-        instantRange = InstantRange.getInstance(this.issuedInstant, 
instantToIssue.getTimestamp(),
-            InstantRange.RangeType.OPEN_CLOSE);
-      } else if 
(this.conf.getOptional(FlinkOptions.READ_STREAMING_START_COMMIT).isPresent()) {
-        // first time consume and has a start commit
-        final String specifiedStart = 
this.conf.getString(FlinkOptions.READ_STREAMING_START_COMMIT);
-        instantRange = 
specifiedStart.equalsIgnoreCase(FlinkOptions.START_COMMIT_EARLIEST)
-            ? null
-            : InstantRange.getInstance(specifiedStart, 
instantToIssue.getTimestamp(), InstantRange.RangeType.CLOSE_CLOSE);
-      } else {
-        // first time consume and no start commit, consumes the latest 
incremental data set.
-        instantRange = InstantRange.getInstance(instantToIssue.getTimestamp(), 
instantToIssue.getTimestamp(),
-            InstantRange.RangeType.CLOSE_CLOSE);
-      }
-    } else {
-      LOG.info("No new instant found for the table under path " + path + ", 
skip reading");
-      return;
-    }
-    // generate input split:
-    // 1. first fetch all the commit metadata for the incremental instants;
-    // 2. filter the relative partition paths
-    // 3. filter the full file paths
-    // 4. use the file paths from #step 3 as the back-up of the filesystem view
-
-    String tableName = conf.getString(FlinkOptions.TABLE_NAME);
-    List<HoodieCommitMetadata> activeMetadataList = instants.stream()
-        .map(instant -> WriteProfiles.getCommitMetadata(tableName, path, 
instant, commitTimeline)).collect(Collectors.toList());
-    List<HoodieCommitMetadata> archivedMetadataList = 
getArchivedMetadata(instantRange, commitTimeline, tableName);
-    if (archivedMetadataList.size() > 0) {
-      LOG.warn(""
-          + 
"--------------------------------------------------------------------------------\n"
-          + "---------- caution: the reader has fall behind too much from the 
writer,\n"
-          + "---------- tweak 'read.tasks' option to add parallelism of read 
tasks.\n"
-          + 
"--------------------------------------------------------------------------------");
-    }
-    List<HoodieCommitMetadata> metadataList = archivedMetadataList.size() > 0
-        ? mergeList(activeMetadataList, archivedMetadataList)
-        : activeMetadataList;
-
-    Set<String> writePartitions = getWritePartitionPaths(metadataList);
-    // apply partition push down
-    if (this.requiredPartitionPaths.size() > 0) {
-      writePartitions = writePartitions.stream()
-          
.filter(this.requiredPartitionPaths::contains).collect(Collectors.toSet());
-    }
-    FileStatus[] fileStatuses = WriteProfiles.getWritePathsOfInstants(path, 
hadoopConf, metadataList);
-    if (fileStatuses.length == 0) {
-      LOG.warn("No files found for reading in user provided path.");
+    IncrementalInputSplits.Result result =
+        incrementalInputSplits.inputSplits(metaClient, this.hadoopConf, 
this.issuedInstant);
+    if (result.isEmpty()) {
+      // no new instants, returns early
       return;
     }
 
-    HoodieTableFileSystemView fsView = new 
HoodieTableFileSystemView(metaClient, commitTimeline, fileStatuses);
-    final String commitToIssue = instantToIssue.getTimestamp();
-    final AtomicInteger cnt = new AtomicInteger(0);
-    final String mergeType = this.conf.getString(FlinkOptions.MERGE_TYPE);
-    List<MergeOnReadInputSplit> inputSplits = writePartitions.stream()
-        .map(relPartitionPath -> 
fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, commitToIssue)
-            .map(fileSlice -> {
-              Option<List<String>> logPaths = 
Option.ofNullable(fileSlice.getLogFiles()
-                  .sorted(HoodieLogFile.getLogFileComparator())
-                  .map(logFile -> logFile.getPath().toString())
-                  .collect(Collectors.toList()));
-              String basePath = 
fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null);
-              return new MergeOnReadInputSplit(cnt.getAndAdd(1),
-                  basePath, logPaths, commitToIssue,
-                  metaClient.getBasePath(), maxCompactionMemoryInBytes, 
mergeType, instantRange);
-            }).collect(Collectors.toList()))
-        .flatMap(Collection::stream)
-        .collect(Collectors.toList());
-
-    for (MergeOnReadInputSplit split : inputSplits) {
+    for (MergeOnReadInputSplit split : result.getInputSplits()) {
       context.collect(split);
     }
     // update the issues instant time
-    this.issuedInstant = commitToIssue;
+    this.issuedInstant = result.getEndInstant();
     LOG.info(""
-        + "------------------------------------------------------------\n"
-        + "---------- consumed to instant: {}\n"
-        + "------------------------------------------------------------",
-        commitToIssue);
+            + "------------------------------------------------------------\n"
+            + "---------- consumed to instant: {}\n"
+            + "------------------------------------------------------------",
+        this.issuedInstant);
   }
 
   @Override
@@ -343,87 +249,4 @@ public class StreamReadMonitoringFunction
       this.instantState.add(this.issuedInstant);
     }
   }
-
-  /**
-   * Returns the archived metadata in case the reader consumes untimely or it 
wants
-   * to read from the earliest.
-   *
-   * <p>Note: should improve it with metadata table when the metadata table is 
stable enough.
-   *
-   * @param instantRange   The instant range to filter the timeline instants
-   * @param commitTimeline The commit timeline
-   * @param tableName      The table name
-   * @return the list of archived metadata, or empty if there is no need to 
read the archived timeline
-   */
-  private List<HoodieCommitMetadata> getArchivedMetadata(
-      InstantRange instantRange,
-      HoodieTimeline commitTimeline,
-      String tableName) {
-    if (instantRange == null || 
commitTimeline.isBeforeTimelineStarts(instantRange.getStartInstant())) {
-      // read the archived metadata if:
-      // 1. the start commit is 'earliest';
-      // 2. the start instant is archived.
-      HoodieArchivedTimeline archivedTimeline = 
metaClient.getArchivedTimeline();
-      HoodieTimeline archivedCompleteTimeline = 
archivedTimeline.getCommitsTimeline().filterCompletedInstants();
-      if (!archivedCompleteTimeline.empty()) {
-        final String endTs = 
archivedCompleteTimeline.lastInstant().get().getTimestamp();
-        Stream<HoodieInstant> instantStream = 
archivedCompleteTimeline.getInstants();
-        if (instantRange != null) {
-          
archivedTimeline.loadInstantDetailsInMemory(instantRange.getStartInstant(), 
endTs);
-          instantStream = instantStream.filter(s -> 
HoodieTimeline.compareTimestamps(s.getTimestamp(), GREATER_THAN_OR_EQUALS, 
instantRange.getStartInstant()));
-        } else {
-          final String startTs = 
archivedCompleteTimeline.firstInstant().get().getTimestamp();
-          archivedTimeline.loadInstantDetailsInMemory(startTs, endTs);
-        }
-        return instantStream
-            .map(instant -> WriteProfiles.getCommitMetadata(tableName, path, 
instant, archivedTimeline)).collect(Collectors.toList());
-      }
-    }
-    return Collections.emptyList();
-  }
-
-  /**
-   * Returns the instants with a given issuedInstant to start from.
-   *
-   * @param commitTimeline The completed commits timeline
-   * @param issuedInstant  The last issued instant that has already been 
delivered to downstream
-   * @return the filtered hoodie instants
-   */
-  private List<HoodieInstant> filterInstantsWithStart(
-      HoodieTimeline commitTimeline,
-      final String issuedInstant) {
-    HoodieTimeline completedTimeline = 
commitTimeline.filterCompletedInstants();
-    if (issuedInstant != null) {
-      return completedTimeline.getInstants()
-          .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), 
GREATER_THAN, issuedInstant))
-          .collect(Collectors.toList());
-    } else if 
(this.conf.getOptional(FlinkOptions.READ_STREAMING_START_COMMIT).isPresent()
-        && 
!this.conf.get(FlinkOptions.READ_STREAMING_START_COMMIT).equalsIgnoreCase(FlinkOptions.START_COMMIT_EARLIEST))
 {
-      String definedStartCommit = 
this.conf.get(FlinkOptions.READ_STREAMING_START_COMMIT);
-      return completedTimeline.getInstants()
-          .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), 
GREATER_THAN_OR_EQUALS, definedStartCommit))
-          .collect(Collectors.toList());
-    } else {
-      return completedTimeline.getInstants().collect(Collectors.toList());
-    }
-  }
-
-  /**
-   * Returns all the incremental write partition paths as a set with the given 
commits metadata.
-   *
-   * @param metadataList The commits metadata
-   * @return the partition path set
-   */
-  private Set<String> getWritePartitionPaths(List<HoodieCommitMetadata> 
metadataList) {
-    return metadataList.stream()
-        .map(HoodieCommitMetadata::getWritePartitionPaths)
-        .flatMap(Collection::stream)
-        .collect(Collectors.toSet());
-  }
-
-  private static <T> List<T> mergeList(List<T> list1, List<T> list2) {
-    List<T> merged = new ArrayList<>(list1);
-    merged.addAll(list2);
-    return merged;
-  }
 }
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java 
b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
index cf1cbd5..a2d0960 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
@@ -155,6 +155,8 @@ public class HoodieTableFactory implements 
DynamicTableSourceFactory, DynamicTab
     setupCompactionOptions(conf);
     // hive options
     setupHiveOptions(conf);
+    // read options
+    setupReadOptions(conf);
     // infer avro schema from physical DDL schema
     inferAvroSchema(conf, 
schema.toPhysicalRowDataType().notNull().getLogicalType());
   }
@@ -271,6 +273,16 @@ public class HoodieTableFactory implements 
DynamicTableSourceFactory, DynamicTab
   }
 
   /**
+   * Sets up the read options from the table definition.
+   */
+  private static void setupReadOptions(Configuration conf) {
+    if (!conf.getBoolean(FlinkOptions.READ_AS_STREAMING)
+        && (conf.getOptional(FlinkOptions.READ_START_COMMIT).isPresent() || 
conf.getOptional(FlinkOptions.READ_END_COMMIT).isPresent())) {
+      conf.setString(FlinkOptions.QUERY_TYPE, 
FlinkOptions.QUERY_TYPE_INCREMENTAL);
+    }
+  }
+
+  /**
    * Inferences the deserialization Avro schema from the table schema (e.g. 
the DDL)
    * if both options {@link FlinkOptions#SOURCE_AVRO_SCHEMA_PATH} and
    * {@link FlinkOptions#SOURCE_AVRO_SCHEMA} are not specified.
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java 
b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index 43743fc..0494143 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -19,7 +19,6 @@
 package org.apache.hudi.table;
 
 import org.apache.hudi.avro.HoodieAvroUtils;
-import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.BaseFile;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieTableType;
@@ -31,6 +30,7 @@ import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.hadoop.HoodieROTablePathFilter;
 import org.apache.hudi.source.FileIndex;
+import org.apache.hudi.source.IncrementalInputSplits;
 import org.apache.hudi.source.StreamReadMonitoringFunction;
 import org.apache.hudi.source.StreamReadOperator;
 import org.apache.hudi.table.format.FilePathUtils;
@@ -108,6 +108,9 @@ public class HoodieTableSource implements
 
   private static final int NO_LIMIT_CONSTANT = -1;
 
+  private static final InputFormat<RowData, ?> EMPTY_INPUT_FORMAT =
+      new CollectionInputFormat<>(Collections.emptyList(), null);
+
   private final transient org.apache.hadoop.conf.Configuration hadoopConf;
   private final transient HoodieTableMetaClient metaClient;
   private final long maxCompactionMemoryInBytes;
@@ -220,7 +223,7 @@ public class HoodieTableSource implements
   public Result applyFilters(List<ResolvedExpression> filters) {
     this.filters = new ArrayList<>(filters);
     // refuse all the filters now
-    return Result.of(Collections.emptyList(), new ArrayList<>(filters));
+    return SupportsFilterPushDown.Result.of(Collections.emptyList(), new 
ArrayList<>(filters));
   }
 
   @Override
@@ -256,8 +259,8 @@ public class HoodieTableSource implements
     DataType[] schemaTypes = this.schema.getColumnDataTypes().toArray(new 
DataType[0]);
 
     return DataTypes.ROW(Arrays.stream(this.requiredPos)
-        .mapToObj(i -> DataTypes.FIELD(schemaFieldNames[i], schemaTypes[i]))
-        .toArray(DataTypes.Field[]::new))
+            .mapToObj(i -> DataTypes.FIELD(schemaFieldNames[i], 
schemaTypes[i]))
+            .toArray(DataTypes.Field[]::new))
         .bridgedTo(RowData.class);
   }
 
@@ -268,16 +271,21 @@ public class HoodieTableSource implements
     return requiredPartitions;
   }
 
+  @Nullable
   private Set<String> getRequiredPartitionPaths() {
     if (this.requiredPartitions == null) {
-      return Collections.emptySet();
+      // returns null for non partition pruning
+      return null;
     }
     return FilePathUtils.toRelativePartitionPaths(this.partitionKeys, 
this.requiredPartitions,
         conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING));
   }
 
-  private List<MergeOnReadInputSplit> buildFileIndex(Path[] paths) {
-    if (paths.length == 0) {
+  private List<MergeOnReadInputSplit> buildFileIndex() {
+    Set<String> requiredPartitionPaths = getRequiredPartitionPaths();
+    fileIndex.setPartitionPaths(requiredPartitionPaths);
+    List<String> relPartitionPaths = fileIndex.getOrBuildPartitionPaths();
+    if (relPartitionPaths.size() == 0) {
       return Collections.emptyList();
     }
     FileStatus[] fileStatuses = fileIndex.getFilesInPartitions();
@@ -292,19 +300,17 @@ public class HoodieTableSource implements
     final String mergeType = this.conf.getString(FlinkOptions.MERGE_TYPE);
     final AtomicInteger cnt = new AtomicInteger(0);
     // generates one input split for each file group
-    return Arrays.stream(paths).map(partitionPath -> {
-      String relPartitionPath = FSUtils.getRelativePartitionPath(path, 
partitionPath);
-      return fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, 
latestCommit)
-          .map(fileSlice -> {
-            String basePath = 
fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null);
-            Option<List<String>> logPaths = 
Option.ofNullable(fileSlice.getLogFiles()
-                .sorted(HoodieLogFile.getLogFileComparator())
-                .map(logFile -> logFile.getPath().toString())
-                .collect(Collectors.toList()));
-            return new MergeOnReadInputSplit(cnt.getAndAdd(1), basePath, 
logPaths, latestCommit,
-                metaClient.getBasePath(), maxCompactionMemoryInBytes, 
mergeType, null);
-          }).collect(Collectors.toList());
-    })
+    return relPartitionPaths.stream()
+        .map(relPartitionPath -> 
fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, latestCommit)
+            .map(fileSlice -> {
+              String basePath = 
fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null);
+              Option<List<String>> logPaths = 
Option.ofNullable(fileSlice.getLogFiles()
+                  .sorted(HoodieLogFile.getLogFileComparator())
+                  .map(logFile -> logFile.getPath().toString())
+                  .collect(Collectors.toList()));
+              return new MergeOnReadInputSplit(cnt.getAndAdd(1), basePath, 
logPaths, latestCommit,
+                  metaClient.getBasePath(), maxCompactionMemoryInBytes, 
mergeType, null);
+            }).collect(Collectors.toList()))
         .flatMap(Collection::stream)
         .collect(Collectors.toList());
   }
@@ -319,16 +325,6 @@ public class HoodieTableSource implements
   }
 
   private InputFormat<RowData, ?> getBatchInputFormat() {
-    // When this table has no partition, just return an empty source.
-    if (!partitionKeys.isEmpty() && getOrFetchPartitions().isEmpty()) {
-      return new CollectionInputFormat<>(Collections.emptyList(), null);
-    }
-
-    final Path[] paths = getReadPaths();
-    if (paths.length == 0) {
-      return new CollectionInputFormat<>(Collections.emptyList(), null);
-    }
-
     final Schema tableAvroSchema = getTableAvroSchema();
     final DataType rowDataType = 
AvroSchemaConverter.convertToDataType(tableAvroSchema);
     final RowType rowType = (RowType) rowDataType.getLogicalType();
@@ -340,62 +336,37 @@ public class HoodieTableSource implements
         final HoodieTableType tableType = 
HoodieTableType.valueOf(this.conf.getString(FlinkOptions.TABLE_TYPE));
         switch (tableType) {
           case MERGE_ON_READ:
-            final List<MergeOnReadInputSplit> inputSplits = 
buildFileIndex(paths);
+            final List<MergeOnReadInputSplit> inputSplits = buildFileIndex();
             if (inputSplits.size() == 0) {
               // When there is no input splits, just return an empty source.
               LOG.warn("No input splits generate for MERGE_ON_READ input 
format, returns empty collection instead");
-              return new CollectionInputFormat<>(Collections.emptyList(), 
null);
+              return EMPTY_INPUT_FORMAT;
             }
-            final MergeOnReadTableState hoodieTableState = new 
MergeOnReadTableState(
-                rowType,
-                requiredRowType,
-                tableAvroSchema.toString(),
-                
AvroSchemaConverter.convertToSchema(requiredRowType).toString(),
-                inputSplits,
-                conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(","));
-            return MergeOnReadInputFormat.builder()
-                .config(this.conf)
-                .paths(FilePathUtils.toFlinkPaths(paths))
-                .tableState(hoodieTableState)
-                // use the explicit fields data type because the 
AvroSchemaConverter
-                // is not very stable.
-                .fieldTypes(rowDataType.getChildren())
-                
.defaultPartName(conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME))
-                .limit(this.limit)
-                .emitDelete(false)
-                .build();
+            return mergeOnReadInputFormat(rowType, requiredRowType, 
tableAvroSchema,
+                rowDataType, inputSplits, false);
           case COPY_ON_WRITE:
-            FileInputFormat<RowData> format = new CopyOnWriteInputFormat(
-                FilePathUtils.toFlinkPaths(paths),
-                this.schema.getColumnNames().toArray(new String[0]),
-                this.schema.getColumnDataTypes().toArray(new DataType[0]),
-                this.requiredPos,
-                this.conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME),
-                this.limit == NO_LIMIT_CONSTANT ? Long.MAX_VALUE : this.limit, 
// ParquetInputFormat always uses the limit value
-                getParquetConf(this.conf, this.hadoopConf),
-                this.conf.getBoolean(FlinkOptions.UTC_TIMEZONE)
-            );
-            format.setFilesFilter(new LatestFileFilter(this.hadoopConf));
-            return format;
+            return baseFileOnlyInputFormat();
           default:
             throw new HoodieException("Unexpected table type: " + 
this.conf.getString(FlinkOptions.TABLE_TYPE));
         }
       case FlinkOptions.QUERY_TYPE_READ_OPTIMIZED:
-        FileInputFormat<RowData> format = new CopyOnWriteInputFormat(
-            FilePathUtils.toFlinkPaths(paths),
-            this.schema.getColumnNames().toArray(new String[0]),
-            this.schema.getColumnDataTypes().toArray(new DataType[0]),
-            this.requiredPos,
-            "default",
-            this.limit == NO_LIMIT_CONSTANT ? Long.MAX_VALUE : this.limit, // 
ParquetInputFormat always uses the limit value
-            getParquetConf(this.conf, this.hadoopConf),
-            this.conf.getBoolean(FlinkOptions.UTC_TIMEZONE)
-        );
-        format.setFilesFilter(new LatestFileFilter(this.hadoopConf));
-        return format;
+        return baseFileOnlyInputFormat();
+      case FlinkOptions.QUERY_TYPE_INCREMENTAL:
+        IncrementalInputSplits incrementalInputSplits = 
IncrementalInputSplits.builder()
+            .conf(conf).path(FilePathUtils.toFlinkPath(path))
+            .maxCompactionMemoryInBytes(maxCompactionMemoryInBytes)
+            .requiredPartitions(getRequiredPartitionPaths()).build();
+        final IncrementalInputSplits.Result result = 
incrementalInputSplits.inputSplits(metaClient, hadoopConf);
+        if (result.isEmpty()) {
+          // When there is no input splits, just return an empty source.
+          LOG.warn("No input splits generate for incremental read, returns 
empty collection instead");
+          return new CollectionInputFormat<>(Collections.emptyList(), null);
+        }
+        return mergeOnReadInputFormat(rowType, requiredRowType, 
tableAvroSchema,
+            rowDataType, result.getInputSplits(), false);
       default:
-        String errMsg = String.format("Invalid query type : '%s', options 
['%s', '%s'] are supported now", queryType,
-            FlinkOptions.QUERY_TYPE_SNAPSHOT, 
FlinkOptions.QUERY_TYPE_READ_OPTIMIZED);
+        String errMsg = String.format("Invalid query type : '%s', options 
['%s', '%s', '%s'] are supported now", queryType,
+            FlinkOptions.QUERY_TYPE_SNAPSHOT, 
FlinkOptions.QUERY_TYPE_READ_OPTIMIZED, FlinkOptions.QUERY_TYPE_INCREMENTAL);
         throw new HoodieException(errMsg);
     }
   }
@@ -408,56 +379,62 @@ public class HoodieTableSource implements
     final RowType requiredRowType = (RowType) 
getProducedDataType().notNull().getLogicalType();
 
     final String queryType = this.conf.getString(FlinkOptions.QUERY_TYPE);
-    org.apache.flink.core.fs.Path[] paths = new 
org.apache.flink.core.fs.Path[0];
     if (FlinkOptions.QUERY_TYPE_SNAPSHOT.equals(queryType)) {
       final HoodieTableType tableType = 
HoodieTableType.valueOf(this.conf.getString(FlinkOptions.TABLE_TYPE));
-      switch (tableType) {
-        case MERGE_ON_READ:
-          final MergeOnReadTableState hoodieTableState = new 
MergeOnReadTableState(
-              rowType,
-              requiredRowType,
-              tableAvroSchema.toString(),
-              AvroSchemaConverter.convertToSchema(requiredRowType).toString(),
-              Collections.emptyList(),
-              conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(","));
-          return MergeOnReadInputFormat.builder()
-              .config(this.conf)
-              .paths(paths)
-              .tableState(hoodieTableState)
-              // use the explicit fields data type because the 
AvroSchemaConverter
-              // is not very stable.
-              .fieldTypes(rowDataType.getChildren())
-              
.defaultPartName(conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME))
-              .limit(this.limit)
-              .emitDelete(true)
-              .build();
-        case COPY_ON_WRITE:
-          final MergeOnReadTableState hoodieTableState2 = new 
MergeOnReadTableState(
-              rowType,
-              requiredRowType,
-              tableAvroSchema.toString(),
-              AvroSchemaConverter.convertToSchema(requiredRowType).toString(),
-              Collections.emptyList(),
-              conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(","));
-          return MergeOnReadInputFormat.builder()
-              .config(this.conf)
-              .paths(paths)
-              .tableState(hoodieTableState2)
-              // use the explicit fields data type because the 
AvroSchemaConverter
-              // is not very stable.
-              .fieldTypes(rowDataType.getChildren())
-              
.defaultPartName(conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME))
-              .limit(this.limit)
-              .build();
-        default:
-          throw new HoodieException("Unexpected table type: " + 
this.conf.getString(FlinkOptions.TABLE_TYPE));
-      }
+      boolean emitDelete = tableType == HoodieTableType.MERGE_ON_READ;
+      return mergeOnReadInputFormat(rowType, requiredRowType, tableAvroSchema,
+          rowDataType, Collections.emptyList(), emitDelete);
     }
     String errMsg = String.format("Invalid query type : '%s', options ['%s'] 
are supported now", queryType,
         FlinkOptions.QUERY_TYPE_SNAPSHOT);
     throw new HoodieException(errMsg);
   }
 
+  private MergeOnReadInputFormat mergeOnReadInputFormat(
+      RowType rowType,
+      RowType requiredRowType,
+      Schema tableAvroSchema,
+      DataType rowDataType,
+      List<MergeOnReadInputSplit> inputSplits,
+      boolean emitDelete) {
+    final MergeOnReadTableState hoodieTableState = new MergeOnReadTableState(
+        rowType,
+        requiredRowType,
+        tableAvroSchema.toString(),
+        AvroSchemaConverter.convertToSchema(requiredRowType).toString(),
+        inputSplits,
+        conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(","));
+    return MergeOnReadInputFormat.builder()
+        .config(this.conf)
+        .tableState(hoodieTableState)
+        // use the explicit fields' data type because the AvroSchemaConverter
+        // is not very stable.
+        .fieldTypes(rowDataType.getChildren())
+        .defaultPartName(conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME))
+        .limit(this.limit)
+        .emitDelete(emitDelete)
+        .build();
+  }
+
+  private InputFormat<RowData, ?> baseFileOnlyInputFormat() {
+    final Path[] paths = getReadPaths();
+    if (paths.length == 0) {
+      return EMPTY_INPUT_FORMAT;
+    }
+    FileInputFormat<RowData> format = new CopyOnWriteInputFormat(
+        FilePathUtils.toFlinkPaths(paths),
+        this.schema.getColumnNames().toArray(new String[0]),
+        this.schema.getColumnDataTypes().toArray(new DataType[0]),
+        this.requiredPos,
+        this.conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME),
+        this.limit == NO_LIMIT_CONSTANT ? Long.MAX_VALUE : this.limit, // 
ParquetInputFormat always uses the limit value
+        getParquetConf(this.conf, this.hadoopConf),
+        this.conf.getBoolean(FlinkOptions.UTC_TIMEZONE)
+    );
+    format.setFilesFilter(new LatestFileFilter(this.hadoopConf));
+    return format;
+  }
+
   private Schema inferSchemaFromDdl() {
     Schema schema = 
AvroSchemaConverter.convertToSchema(this.schema.toPhysicalRowDataType().getLogicalType());
     return HoodieAvroUtils.addMetadataFields(schema, 
conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED));
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
 
b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
index 2042b96..e3a8eee 100644
--- 
a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
@@ -45,7 +45,6 @@ import 
org.apache.flink.api.common.io.DefaultInputSplitAssigner;
 import org.apache.flink.api.common.io.RichInputFormat;
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
@@ -85,8 +84,6 @@ public class MergeOnReadInputFormat
 
   private transient org.apache.hadoop.conf.Configuration hadoopConf;
 
-  private Path[] paths;
-
   private final MergeOnReadTableState tableState;
 
   /**
@@ -134,14 +131,12 @@ public class MergeOnReadInputFormat
 
   private MergeOnReadInputFormat(
       Configuration conf,
-      Path[] paths,
       MergeOnReadTableState tableState,
       List<DataType> fieldTypes,
       String defaultPartName,
       long limit,
       boolean emitDelete) {
     this.conf = conf;
-    this.paths = paths;
     this.tableState = tableState;
     this.fieldNames = tableState.getRowType().getFieldNames();
     this.fieldTypes = fieldTypes;
@@ -165,7 +160,7 @@ public class MergeOnReadInputFormat
     this.currentReadCount = 0L;
     this.hadoopConf = StreamerUtil.getHadoopConf();
     if (!(split.getLogPaths().isPresent() && split.getLogPaths().get().size() 
> 0)) {
-      if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING)) {
+      if (split.getInstantRange() != null) {
         // base file only with commit time filtering
         this.iterator = new BaseFileOnlyFilteringIterator(
             split.getInstantRange(),
@@ -212,16 +207,8 @@ public class MergeOnReadInputFormat
 
   @Override
   public void configure(Configuration configuration) {
-    if (this.paths.length == 0) {
-      // file path was not specified yet. Try to set it from the parameters.
-      String filePath = configuration.getString(FlinkOptions.PATH, null);
-      if (filePath == null) {
-        throw new IllegalArgumentException("File path was not specified in 
input format or configuration.");
-      } else {
-        this.paths = new Path[] {new Path(filePath)};
-      }
-    }
-    // may supports nested files in the future.
+    // no operation
+    // may support nested files in the future.
   }
 
   @Override
@@ -750,7 +737,6 @@ public class MergeOnReadInputFormat
    */
   public static class Builder {
     private Configuration conf;
-    private Path[] paths;
     private MergeOnReadTableState tableState;
     private List<DataType> fieldTypes;
     private String defaultPartName;
@@ -762,11 +748,6 @@ public class MergeOnReadInputFormat
       return this;
     }
 
-    public Builder paths(Path[] paths) {
-      this.paths = paths;
-      return this;
-    }
-
     public Builder tableState(MergeOnReadTableState tableState) {
       this.tableState = tableState;
       return this;
@@ -793,8 +774,8 @@ public class MergeOnReadInputFormat
     }
 
     public MergeOnReadInputFormat build() {
-      return new MergeOnReadInputFormat(conf, paths, tableState,
-          fieldTypes, defaultPartName, limit, emitDelete);
+      return new MergeOnReadInputFormat(conf, tableState, fieldTypes,
+          defaultPartName, limit, emitDelete);
     }
   }
 
diff --git a/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java 
b/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java
index f229f2d..334df59 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java
@@ -88,4 +88,18 @@ public class TestFileIndex {
     assertThat(fileStatuses.length, is(1));
     
assertTrue(fileStatuses[0].getPath().toString().endsWith(HoodieFileFormat.PARQUET.getFileExtension()));
   }
+
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  void testFileListingEmptyTable(boolean enableMetadata) {
+    Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+    conf.setBoolean(FlinkOptions.METADATA_ENABLED, enableMetadata);
+    FileIndex fileIndex = FileIndex.instance(new 
Path(tempFile.getAbsolutePath()), conf);
+    List<String> partitionKeys = Collections.singletonList("partition");
+    List<Map<String, String>> partitions = 
fileIndex.getPartitions(partitionKeys, "default", false);
+    assertThat(partitions.size(), is(0));
+
+    FileStatus[] fileStatuses = fileIndex.getFilesInPartitions();
+    assertThat(fileStatuses.length, is(0));
+  }
 }
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
 
b/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
index d13f683..3687e9d 100644
--- 
a/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
+++ 
b/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
@@ -144,7 +144,7 @@ public class TestStreamReadMonitoringFunction {
     TestData.writeData(TestData.DATA_SET_INSERT, conf);
     TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, conf);
     String specifiedCommit = 
TestUtils.getLatestCommit(tempFile.getAbsolutePath());
-    conf.setString(FlinkOptions.READ_STREAMING_START_COMMIT, specifiedCommit);
+    conf.setString(FlinkOptions.READ_START_COMMIT, specifiedCommit);
     StreamReadMonitoringFunction function = TestUtils.getMonitorFunc(conf);
     try (AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> harness = 
createHarness(function)) {
       harness.setup();
@@ -175,7 +175,7 @@ public class TestStreamReadMonitoringFunction {
     TestData.writeData(TestData.DATA_SET_INSERT, conf);
     TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, conf);
     String specifiedCommit = 
TestUtils.getLatestCommit(tempFile.getAbsolutePath());
-    conf.setString(FlinkOptions.READ_STREAMING_START_COMMIT, 
FlinkOptions.START_COMMIT_EARLIEST);
+    conf.setString(FlinkOptions.READ_START_COMMIT, 
FlinkOptions.START_COMMIT_EARLIEST);
     StreamReadMonitoringFunction function = TestUtils.getMonitorFunc(conf);
     try (AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> harness = 
createHarness(function)) {
       harness.setup();
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java 
b/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java
index 233e6fa..911c685 100644
--- 
a/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java
+++ 
b/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java
@@ -22,7 +22,6 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.table.format.FilePathUtils;
 import org.apache.hudi.table.format.mor.MergeOnReadInputFormat;
 import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
 import org.apache.hudi.table.format.mor.MergeOnReadTableState;
@@ -45,7 +44,6 @@ import 
org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.RowType;
-import org.apache.hadoop.fs.Path;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
@@ -263,10 +261,8 @@ public class TestStreamReadOperator {
         
AvroSchemaConverter.convertToSchema(TestConfigurations.ROW_TYPE).toString(),
         Collections.emptyList(),
         new String[0]);
-    Path[] paths = FilePathUtils.getReadPaths(new Path(basePath), conf, 
hadoopConf, partitionKeys);
     MergeOnReadInputFormat inputFormat = MergeOnReadInputFormat.builder()
         .config(conf)
-        .paths(FilePathUtils.toFlinkPaths(paths))
         .tableState(hoodieTableState)
         .fieldTypes(rowDataType.getChildren())
         .defaultPartName("default").limit(1000L)
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java 
b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
index 9d0bcab..db7111b 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
@@ -113,7 +113,7 @@ public class HoodieDataSourceITCase extends 
AbstractTestBase {
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
         .option(FlinkOptions.READ_AS_STREAMING, true)
         .option(FlinkOptions.TABLE_TYPE, tableType)
-        .option(FlinkOptions.READ_STREAMING_START_COMMIT, firstCommit)
+        .option(FlinkOptions.READ_START_COMMIT, firstCommit)
         .end();
     streamTableEnv.executeSql(hoodieTableDDL);
     List<Row> rows = execSelectSql(streamTableEnv, "select * from t1", 10);
@@ -186,7 +186,7 @@ public class HoodieDataSourceITCase extends 
AbstractTestBase {
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
         .option(FlinkOptions.READ_AS_STREAMING, true)
         .option(FlinkOptions.TABLE_TYPE, tableType)
-        .option(FlinkOptions.READ_STREAMING_START_COMMIT, specifiedCommit)
+        .option(FlinkOptions.READ_START_COMMIT, specifiedCommit)
         .end();
     streamTableEnv.executeSql(createHoodieTable2);
     List<Row> rows = execSelectSql(streamTableEnv, "select * from t2", 10);
@@ -289,7 +289,7 @@ public class HoodieDataSourceITCase extends 
AbstractTestBase {
         .option(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ)
         .option(FlinkOptions.READ_AS_STREAMING, true)
         .option(FlinkOptions.READ_STREAMING_CHECK_INTERVAL, 2)
-        .option(FlinkOptions.READ_STREAMING_START_COMMIT, latestCommit)
+        .option(FlinkOptions.READ_START_COMMIT, latestCommit)
         .option(FlinkOptions.CHANGELOG_ENABLED, true)
         .end();
     streamTableEnv.executeSql(hoodieTableDDL);
@@ -343,7 +343,7 @@ public class HoodieDataSourceITCase extends 
AbstractTestBase {
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
         .option(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ)
         .option(FlinkOptions.READ_AS_STREAMING, true)
-        .option(FlinkOptions.READ_STREAMING_START_COMMIT, 
FlinkOptions.START_COMMIT_EARLIEST)
+        .option(FlinkOptions.READ_START_COMMIT, 
FlinkOptions.START_COMMIT_EARLIEST)
         .option(FlinkOptions.READ_STREAMING_CHECK_INTERVAL, 2)
         // close the async compaction
         .option(FlinkOptions.COMPACTION_ASYNC_ENABLED, false)
@@ -879,6 +879,33 @@ public class HoodieDataSourceITCase extends 
AbstractTestBase {
     assertRowsEquals(result1, "[+I[1.23, 12345678.12, 12345.12, 
123456789.123450000000000000]]");
   }
 
+  @ParameterizedTest
+  @EnumSource(value = HoodieTableType.class)
+  void testIncrementalRead(HoodieTableType tableType) throws Exception {
+    TableEnvironment tableEnv = batchTableEnv;
+    Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+    conf.setString(FlinkOptions.TABLE_NAME, "t1");
+    conf.setString(FlinkOptions.TABLE_TYPE, tableType.name());
+
+    // write 3 batches of data set
+    TestData.writeData(TestData.dataSetInsert(1, 2), conf);
+    TestData.writeData(TestData.dataSetInsert(3, 4), conf);
+    TestData.writeData(TestData.dataSetInsert(5, 6), conf);
+
+    String latestCommit = 
TestUtils.getLatestCommit(tempFile.getAbsolutePath());
+
+    String hoodieTableDDL = sql("t1")
+        .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .option(FlinkOptions.TABLE_TYPE, tableType)
+        .option(FlinkOptions.READ_START_COMMIT, latestCommit)
+        .end();
+    tableEnv.executeSql(hoodieTableDDL);
+
+    List<Row> result = CollectionUtil.iterableToList(
+        () -> tableEnv.sqlQuery("select * from t1").execute().collect());
+    assertRowsEquals(result, TestData.dataSetInsert(5, 6));
+  }
+
   // -------------------------------------------------------------------------
   //  Utilities
   // -------------------------------------------------------------------------
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java 
b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
index 1572dd4..bbbb49d 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
@@ -232,6 +232,32 @@ public class TestHoodieTableFactory {
   }
 
   @Test
+  void testSetupReadOptionsForSource() {
+    // definition with simple primary key and partition path
+    ResolvedSchema schema1 = SchemaBuilder.instance()
+        .field("f0", DataTypes.INT().notNull())
+        .field("f1", DataTypes.VARCHAR(20))
+        .field("f2", DataTypes.TIMESTAMP(3))
+        .field("ts", DataTypes.TIMESTAMP(3))
+        .primaryKey("f0")
+        .build();
+    // set up new retains commits that is less than min archive commits
+    this.conf.setString(FlinkOptions.READ_END_COMMIT, "123");
+
+    final MockContext sourceContext1 = MockContext.getInstance(this.conf, 
schema1, "f2");
+    final HoodieTableSource tableSource1 = (HoodieTableSource) new 
HoodieTableFactory().createDynamicTableSource(sourceContext1);
+    final Configuration conf1 = tableSource1.getConf();
+    assertThat(conf1.getString(FlinkOptions.QUERY_TYPE), 
is(FlinkOptions.QUERY_TYPE_INCREMENTAL));
+
+    this.conf.removeConfig(FlinkOptions.READ_END_COMMIT);
+    this.conf.setString(FlinkOptions.READ_START_COMMIT, "123");
+    final MockContext sourceContext2 = MockContext.getInstance(this.conf, 
schema1, "f2");
+    final HoodieTableSource tableSource2 = (HoodieTableSource) new 
HoodieTableFactory().createDynamicTableSource(sourceContext2);
+    final Configuration conf2 = tableSource2.getConf();
+    assertThat(conf2.getString(FlinkOptions.QUERY_TYPE), 
is(FlinkOptions.QUERY_TYPE_INCREMENTAL));
+  }
+
+  @Test
   void testInferAvroSchemaForSink() {
     // infer the schema if not specified
     final HoodieTableSink tableSink1 =
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java 
b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
index d50a716..8ee18a9 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
@@ -19,7 +19,6 @@
 package org.apache.hudi.table;
 
 import org.apache.hudi.configuration.FlinkOptions;
-import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.table.format.mor.MergeOnReadInputFormat;
 import org.apache.hudi.utils.TestConfigurations;
 import org.apache.hudi.utils.TestData;
@@ -31,6 +30,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.data.RowData;
 import org.apache.hadoop.fs.Path;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.function.ThrowingSupplier;
 import org.junit.jupiter.api.io.TempDir;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -46,9 +46,9 @@ import java.util.stream.Collectors;
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.Is.is;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertThrows;
 
 /**
  * Test cases for HoodieTableSource.
@@ -112,9 +112,9 @@ public class TestHoodieTableSource {
     inputFormat = tableSource.getInputFormat();
     assertThat(inputFormat, is(instanceOf(MergeOnReadInputFormat.class)));
     conf.setString(FlinkOptions.QUERY_TYPE.key(), 
FlinkOptions.QUERY_TYPE_INCREMENTAL);
-    assertThrows(HoodieException.class,
-        () -> tableSource.getInputFormat(),
-        "Invalid query type : 'incremental'. Only 'snapshot' is supported 
now");
+    assertDoesNotThrow(
+        (ThrowingSupplier<? extends InputFormat<RowData, ?>>) 
tableSource::getInputFormat,
+        "Query type: 'incremental' should be supported");
   }
 
   @Test
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java 
b/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
index f83b2d9..d469205 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
@@ -19,6 +19,8 @@
 package org.apache.hudi.table.format;
 
 import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.table.HoodieTableSource;
 import org.apache.hudi.table.format.cow.CopyOnWriteInputFormat;
@@ -44,6 +46,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.is;
@@ -71,12 +74,7 @@ public class TestInputFormat {
     options.forEach((key, value) -> conf.setString(key, value));
 
     StreamerUtil.initTableIfNotExists(conf);
-    this.tableSource = new HoodieTableSource(
-        TestConfigurations.TABLE_SCHEMA,
-        new Path(tempFile.getAbsolutePath()),
-        Collections.singletonList("partition"),
-        "default",
-        conf);
+    this.tableSource = getTableSource(conf);
   }
 
   @ParameterizedTest
@@ -385,10 +383,81 @@ public class TestInputFormat {
     assertThat(actual, is(expected));
   }
 
+  @ParameterizedTest
+  @EnumSource(value = HoodieTableType.class)
+  void testReadIncrementally(HoodieTableType tableType) throws Exception {
+    Map<String, String> options = new HashMap<>();
+    options.put(FlinkOptions.QUERY_TYPE.key(), 
FlinkOptions.QUERY_TYPE_INCREMENTAL);
+    beforeEach(tableType, options);
+
+    // write another commit to read again
+    for (int i = 0; i < 6; i += 2) {
+      List<RowData> dataset = TestData.dataSetInsert(i + 1, i + 2);
+      TestData.writeData(dataset, conf);
+    }
+
+    HoodieTableMetaClient metaClient = 
StreamerUtil.createMetaClient(tempFile.getAbsolutePath());
+    List<String> commits = 
metaClient.getCommitsTimeline().filterCompletedInstants().getInstants()
+        .map(HoodieInstant::getTimestamp).collect(Collectors.toList());
+
+    assertThat(commits.size(), is(3));
+
+    // only the start commit
+    conf.setString(FlinkOptions.READ_START_COMMIT, commits.get(1));
+    this.tableSource = getTableSource(conf);
+    InputFormat<RowData, ?> inputFormat1 = this.tableSource.getInputFormat();
+    assertThat(inputFormat1, instanceOf(MergeOnReadInputFormat.class));
+
+    List<RowData> actual1 = readData(inputFormat1);
+    final List<RowData> expected1 = TestData.dataSetInsert(3, 4, 5, 6);
+    TestData.assertRowDataEquals(actual1, expected1);
+
+    // only the start commit: earliest
+    conf.setString(FlinkOptions.READ_START_COMMIT, 
FlinkOptions.START_COMMIT_EARLIEST);
+    this.tableSource = getTableSource(conf);
+    InputFormat<RowData, ?> inputFormat2 = this.tableSource.getInputFormat();
+    assertThat(inputFormat2, instanceOf(MergeOnReadInputFormat.class));
+
+    List<RowData> actual2 = readData(inputFormat2);
+    final List<RowData> expected2 = TestData.dataSetInsert(1, 2, 3, 4, 5, 6);
+    TestData.assertRowDataEquals(actual2, expected2);
+
+    // start and end commit: [start commit, end commit]
+    conf.setString(FlinkOptions.READ_START_COMMIT, commits.get(0));
+    conf.setString(FlinkOptions.READ_END_COMMIT, commits.get(1));
+    this.tableSource = getTableSource(conf);
+    InputFormat<RowData, ?> inputFormat3 = this.tableSource.getInputFormat();
+    assertThat(inputFormat3, instanceOf(MergeOnReadInputFormat.class));
+
+    List<RowData> actual3 = readData(inputFormat3);
+    final List<RowData> expected3 = TestData.dataSetInsert(1, 2, 3, 4);
+    TestData.assertRowDataEquals(actual3, expected3);
+
+    // only the end commit: point in time query
+    conf.removeConfig(FlinkOptions.READ_START_COMMIT);
+    conf.setString(FlinkOptions.READ_END_COMMIT, commits.get(1));
+    this.tableSource = getTableSource(conf);
+    InputFormat<RowData, ?> inputFormat4 = this.tableSource.getInputFormat();
+    assertThat(inputFormat4, instanceOf(MergeOnReadInputFormat.class));
+
+    List<RowData> actual4 = readData(inputFormat4);
+    final List<RowData> expected4 = TestData.dataSetInsert(3, 4);
+    TestData.assertRowDataEquals(actual4, expected4);
+  }
+
   // -------------------------------------------------------------------------
   //  Utilities
   // -------------------------------------------------------------------------
 
+  private HoodieTableSource getTableSource(Configuration conf) {
+    return new HoodieTableSource(
+        TestConfigurations.TABLE_SCHEMA,
+        new Path(tempFile.getAbsolutePath()),
+        Collections.singletonList("partition"),
+        "default",
+        conf);
+  }
+
   @SuppressWarnings("unchecked, rawtypes")
   private static List<RowData> readData(InputFormat inputFormat) throws 
IOException {
     InputSplit[] inputSplits = inputFormat.createInputSplits(1);
diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java 
b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index 3e0afc2..b0f7b5f 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -284,6 +284,14 @@ public class TestData {
           TimestampData.fromEpochMillis(2), StringData.fromString("par1"))
   );
 
+  public static List<RowData> dataSetInsert(int... ids) {
+    List<RowData> inserts = new ArrayList<>();
+    Arrays.stream(ids).forEach(i -> inserts.add(
+        insertRow(StringData.fromString("id" + i), 
StringData.fromString("Danny"), 23,
+            TimestampData.fromEpochMillis(i), StringData.fromString("par1"))));
+    return inserts;
+  }
+
   private static Integer toIdSafely(Object id) {
     if (id == null) {
       return -1;
@@ -424,7 +432,7 @@ public class TestData {
    */
   public static void assertRowDataEquals(List<RowData> rows, List<RowData> 
expected) {
     String rowsString = rowDataToString(rows);
-    assertThat(rowDataToString(expected), is(rowsString));
+    assertThat(rowsString, is(rowDataToString(expected)));
   }
 
   /**
diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java 
b/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java
index 4e9ad51..3719705 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java
@@ -28,7 +28,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 
 import java.io.File;
-import java.util.Collections;
 
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -58,6 +57,6 @@ public class TestUtils {
 
   public static StreamReadMonitoringFunction getMonitorFunc(Configuration 
conf) {
     final String basePath = conf.getString(FlinkOptions.PATH);
-    return new StreamReadMonitoringFunction(conf, new Path(basePath), 1024 * 
1024L, Collections.emptySet());
+    return new StreamReadMonitoringFunction(conf, new Path(basePath), 1024 * 
1024L, null);
   }
 }

Reply via email to