This is an automated email from the ASF dual-hosted git repository.

vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 2fdae68  [HUDI-1663] Streaming read for Flink MOR table (#2640)
2fdae68 is described below

commit 2fdae6835ce3fcad3111205d2373a69b34788483
Author: Danny Chan <[email protected]>
AuthorDate: Wed Mar 10 22:44:06 2021 +0800

    [HUDI-1663] Streaming read for Flink MOR table (#2640)
    
    Supports two read modes:
    * Read the full data set starting from the latest commit instant and
      subsequent incremental data set
    * Read data set that starts from a specified commit instant
---
 .../hudi/common/model/HoodieCommitMetadata.java    |   5 +
 .../org/apache/hudi/operator/FlinkOptions.java     |  36 +-
 .../operator/StreamReadMonitoringFunction.java     | 372 +++++++++++++++++++++
 .../apache/hudi/operator/StreamReadOperator.java   | 237 +++++++++++++
 .../java/org/apache/hudi/sink/HoodieTableSink.java |   2 +-
 .../org/apache/hudi/source/HoodieTableSource.java  | 114 +++----
 .../apache/hudi/source/format/FilePathUtils.java   | 108 ++++++
 .../hudi/source/format/mor/InstantRange.java       | 101 ++++++
 .../source/format/mor/MergeOnReadInputFormat.java  |  11 +
 .../source/format/mor/MergeOnReadInputSplit.java   |   9 +-
 .../apache/hudi/streamer/HoodieFlinkStreamer.java  |   2 +-
 .../hudi/streamer/HoodieFlinkStreamerV2.java       |   2 +-
 .../java/org/apache/hudi/util/StreamerUtil.java    |  17 +-
 .../hudi/operator/utils/TestConfigurations.java    |  29 +-
 .../org/apache/hudi/operator/utils/TestData.java   | 105 +++++-
 .../apache/hudi/source/HoodieDataSourceITCase.java | 118 +++++--
 ...eSourceTest.java => TestHoodieTableSource.java} |   4 +-
 .../source/TestStreamReadMonitoringFunction.java   | 269 +++++++++++++++
 .../apache/hudi/source/TestStreamReadOperator.java | 290 ++++++++++++++++
 .../{InputFormatTest.java => TestInputFormat.java} |  23 +-
 .../test/java/org/apache/hudi/utils/TestUtils.java |  64 ++++
 .../utils/factory/CollectSinkTableFactory.java     | 174 ++++++++++
 .../org.apache.flink.table.factories.Factory       |  17 +
 hudi-flink/src/test/resources/test_source2.data    |   8 +
 24 files changed, 1989 insertions(+), 128 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
index bc9a4ba..da72b16 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
@@ -36,6 +36,7 @@ import java.io.Serializable;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 
@@ -339,6 +340,10 @@ public class HoodieCommitMetadata implements Serializable {
         maxEventTime == Long.MIN_VALUE ? Option.empty() : 
Option.of(maxEventTime));
   }
 
+  public HashSet<String> getWritePartitionPaths() {
+    return new HashSet<>(partitionToWriteStats.keySet());
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) {
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/operator/FlinkOptions.java 
b/hudi-flink/src/main/java/org/apache/hudi/operator/FlinkOptions.java
index 10dc5be..a724830 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/operator/FlinkOptions.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/operator/FlinkOptions.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.operator;
 
+import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.streamer.FlinkStreamerConfig;
 import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
@@ -68,6 +69,12 @@ public class FlinkOptions {
   // ------------------------------------------------------------------------
   //  Read Options
   // ------------------------------------------------------------------------
+  public static final ConfigOption<Integer> READ_TASKS = ConfigOptions
+      .key("read.tasks")
+      .intType()
+      .defaultValue(4)
+      .withDescription("Parallelism of tasks that do actual read, default is 
4");
+
   public static final ConfigOption<String> READ_SCHEMA_FILE_PATH = 
ConfigOptions
       .key("read.schema.file.path")
       .stringType()
@@ -112,6 +119,25 @@ public class FlinkOptions {
           + " time and LocalDateTime. Hive 0.x/1.x/2.x use local timezone. But 
Hive 3.x"
           + " use UTC timezone, by default true");
 
+  public static final ConfigOption<Boolean> READ_AS_STREAMING = ConfigOptions
+      .key("read.streaming.enabled")
+      .booleanType()
+      .defaultValue(false)// default read as batch
+      .withDescription("Whether to read as streaming source, default false");
+
+  public static final ConfigOption<Integer> READ_STREAMING_CHECK_INTERVAL = 
ConfigOptions
+      .key("read.streaming.check-interval")
+      .intType()
+      .defaultValue(60)// default 1 minute
+      .withDescription("Check interval for streaming read of SECOND, default 1 
minute");
+
+  public static final ConfigOption<String> READ_STREAMING_START_COMMIT = 
ConfigOptions
+      .key("read.streaming.start-commit")
+      .stringType()
+      .noDefaultValue()
+      .withDescription("Start commit instant for streaming read, the commit 
time format should be 'yyyyMMddHHmmss', "
+          + "by default reading from the latest instant");
+
   // ------------------------------------------------------------------------
   //  Write Options
   // ------------------------------------------------------------------------
@@ -121,8 +147,8 @@ public class FlinkOptions {
       .noDefaultValue()
       .withDescription("Table name to register to Hive metastore");
 
-  public static final String TABLE_TYPE_COPY_ON_WRITE = "COPY_ON_WRITE";
-  public static final String TABLE_TYPE_MERGE_ON_READ = "MERGE_ON_READ";
+  public static final String TABLE_TYPE_COPY_ON_WRITE = 
HoodieTableType.COPY_ON_WRITE.name();
+  public static final String TABLE_TYPE_MERGE_ON_READ = 
HoodieTableType.MERGE_ON_READ.name();
   public static final ConfigOption<String> TABLE_TYPE = ConfigOptions
       .key("write.table.type")
       .stringType()
@@ -203,8 +229,8 @@ public class FlinkOptions {
       .defaultValue(SimpleAvroKeyGenerator.class.getName())
       .withDescription("Key generator class, that implements will extract the 
key out of incoming record");
 
-  public static final ConfigOption<Integer> WRITE_TASK_PARALLELISM = 
ConfigOptions
-      .key("write.task.parallelism")
+  public static final ConfigOption<Integer> WRITE_TASKS = ConfigOptions
+      .key("write.tasks")
       .intType()
       .defaultValue(4)
       .withDescription("Parallelism of tasks that do actual write, default is 
4");
@@ -290,7 +316,7 @@ public class FlinkOptions {
     conf.setString(FlinkOptions.RECORD_KEY_FIELD, config.recordKeyField);
     conf.setString(FlinkOptions.PARTITION_PATH_FIELD, 
config.partitionPathField);
     conf.setString(FlinkOptions.KEYGEN_CLASS, config.keygenClass);
-    conf.setInteger(FlinkOptions.WRITE_TASK_PARALLELISM, config.writeTaskNum);
+    conf.setInteger(FlinkOptions.WRITE_TASKS, config.writeTaskNum);
 
     return conf;
   }
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamReadMonitoringFunction.java
 
b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamReadMonitoringFunction.java
new file mode 100644
index 0000000..2e24aca
--- /dev/null
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamReadMonitoringFunction.java
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.operator;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.source.format.mor.InstantRange;
+import org.apache.hudi.source.format.mor.MergeOnReadInputSplit;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN_OR_EQUALS;
+
+/**
+ * This is the single (non-parallel) monitoring task which takes a {@link 
MergeOnReadInputSplit}
+ * , it is responsible for:
+ *
+ * <ol>
+ *     <li>Monitoring a user-provided hoodie table path.</li>
+ *     <li>Deciding which files(or split) should be further read and 
processed.</li>
+ *     <li>Creating the {@link MergeOnReadInputSplit splits} corresponding to 
those files.</li>
+ *     <li>Assigning them to downstream tasks for further processing.</li>
+ * </ol>
+ *
+ * <p>The splits to be read are forwarded to the downstream {@link 
StreamReadOperator}
+ * which can have parallelism greater than one.
+ *
+ * <p><b>IMPORTANT NOTE: </b> Splits are forwarded downstream for reading in 
ascending instant commits time order,
+ * in each downstream task, the splits are also read in receiving sequence. We 
do not ensure split consuming sequence
+ * among the downstream tasks.
+ */
+public class StreamReadMonitoringFunction
+    extends RichSourceFunction<MergeOnReadInputSplit> implements 
CheckpointedFunction {
+  private static final Logger LOG = 
LoggerFactory.getLogger(StreamReadMonitoringFunction.class);
+
+  private static final long serialVersionUID = 1L;
+
+  /**
+   * The path to monitor.
+   */
+  private final Path path;
+
+  /**
+   * The interval between consecutive path scans.
+   */
+  private final long interval;
+
+  private transient Object checkpointLock;
+
+  private volatile boolean isRunning = true;
+
+  private String issuedInstant;
+
+  private transient ListState<String> instantState;
+
+  private final Configuration conf;
+
+  private transient org.apache.hadoop.conf.Configuration hadoopConf;
+
+  private final HoodieTableMetaClient metaClient;
+
+  private final long maxCompactionMemoryInBytes;
+
+  public StreamReadMonitoringFunction(
+      Configuration conf,
+      Path path,
+      HoodieTableMetaClient metaClient,
+      long maxCompactionMemoryInBytes) {
+    this.conf = conf;
+    this.path = path;
+    this.metaClient = metaClient;
+    this.interval = 
conf.getInteger(FlinkOptions.READ_STREAMING_CHECK_INTERVAL);
+    this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
+  }
+
+  @Override
+  public void initializeState(FunctionInitializationContext context) throws 
Exception {
+
+    ValidationUtils.checkState(this.instantState == null,
+        "The " + getClass().getSimpleName() + " has already been 
initialized.");
+
+    this.instantState = context.getOperatorStateStore().getListState(
+        new ListStateDescriptor<>(
+            "file-monitoring-state",
+            StringSerializer.INSTANCE
+        )
+    );
+
+    if (context.isRestored()) {
+      LOG.info("Restoring state for the class {} with table {} and base path 
{}.",
+          getClass().getSimpleName(), conf.getString(FlinkOptions.TABLE_NAME), 
path);
+
+      List<String> retrievedStates = new ArrayList<>();
+      for (String entry : this.instantState.get()) {
+        retrievedStates.add(entry);
+      }
+
+      ValidationUtils.checkArgument(retrievedStates.size() <= 1,
+          getClass().getSimpleName() + " retrieved invalid state.");
+
+      if (retrievedStates.size() == 1 && issuedInstant != null) {
+        // this is the case where we have both legacy and new state.
+        // the two should be mutually exclusive for the operator, thus we 
throw the exception.
+
+        throw new IllegalArgumentException(
+            "The " + getClass().getSimpleName() + " has already restored from 
a previous Flink version.");
+
+      } else if (retrievedStates.size() == 1) {
+        this.issuedInstant = retrievedStates.get(0);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("{} retrieved a issued instant of time {} for table {} 
with path {}.",
+              getClass().getSimpleName(), issuedInstant, 
conf.get(FlinkOptions.TABLE_NAME), path);
+        }
+      }
+    }
+  }
+
+  @Override
+  public void open(Configuration parameters) throws Exception {
+    super.open(parameters);
+    this.hadoopConf = StreamerUtil.getHadoopConf();
+  }
+
+  @Override
+  public void run(SourceFunction.SourceContext<MergeOnReadInputSplit> context) 
throws Exception {
+    checkpointLock = context.getCheckpointLock();
+    while (isRunning) {
+      synchronized (checkpointLock) {
+        monitorDirAndForwardSplits(context);
+      }
+      TimeUnit.SECONDS.sleep(interval);
+    }
+  }
+
+  @VisibleForTesting
+  public void monitorDirAndForwardSplits(SourceContext<MergeOnReadInputSplit> 
context) {
+    metaClient.reloadActiveTimeline();
+    HoodieTimeline commitTimeline = 
metaClient.getCommitsAndCompactionTimeline().filterCompletedInstants();
+    if (commitTimeline.empty()) {
+      LOG.warn("No splits found for the table under path " + path);
+      return;
+    }
+    List<HoodieInstant> instants = getUncompactedInstants(commitTimeline, 
this.issuedInstant);
+    // get the latest instant that satisfies condition
+    final HoodieInstant instantToIssue = instants.size() == 0 ? null : 
instants.get(instants.size() - 1);
+    final InstantRange instantRange;
+    if (instantToIssue != null) {
+      if (this.issuedInstant != null) {
+        // had already consumed an instant
+        instantRange = InstantRange.getInstance(this.issuedInstant, 
instantToIssue.getTimestamp(),
+            InstantRange.RangeType.OPEN_CLOSE);
+      } else if 
(this.conf.getOptional(FlinkOptions.READ_STREAMING_START_COMMIT).isPresent()) {
+        // first time consume and has a start commit
+        final String specifiedStart = 
this.conf.getString(FlinkOptions.READ_STREAMING_START_COMMIT);
+        instantRange = InstantRange.getInstance(specifiedStart, 
instantToIssue.getTimestamp(),
+            InstantRange.RangeType.CLOSE_CLOSE);
+      } else {
+        // first time consume and no start commit,
+        // would consume all the snapshot data PLUS incremental data set
+        instantRange = null;
+      }
+    } else {
+      LOG.info("No new instant found for the table under path " + path + ", 
skip reading");
+      return;
+    }
+    // generate input split:
+    // 1. first fetch all the commit metadata for the incremental instants;
+    // 2. filter the relative partition paths
+    // 3. filter the full file paths
+    // 4. use the file paths from #step 3 as the back-up of the filesystem view
+
+    List<HoodieCommitMetadata> metadataList = instants.stream()
+        .map(instant -> getCommitMetadata(instant, 
commitTimeline)).collect(Collectors.toList());
+    Set<String> writePartitions = getWritePartitionPaths(metadataList);
+    FileStatus[] fileStatuses = getWritePathsOfInstants(metadataList);
+    if (fileStatuses.length == 0) {
+      throw new HoodieException("No files found for reading in user provided 
path.");
+    }
+
+    HoodieTableFileSystemView fsView = new 
HoodieTableFileSystemView(metaClient, commitTimeline, fileStatuses);
+    final String commitToIssue = instantToIssue.getTimestamp();
+    final AtomicInteger cnt = new AtomicInteger(0);
+    final String mergeType = this.conf.getString(FlinkOptions.MERGE_TYPE);
+    List<MergeOnReadInputSplit> inputSplits = writePartitions.stream()
+        .map(relPartitionPath -> 
fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, commitToIssue)
+        .map(fileSlice -> {
+          Option<List<String>> logPaths = 
Option.ofNullable(fileSlice.getLogFiles()
+              .sorted(HoodieLogFile.getLogFileComparator())
+              .map(logFile -> logFile.getPath().toString())
+              .collect(Collectors.toList()));
+          return new MergeOnReadInputSplit(cnt.getAndAdd(1),
+              null, logPaths, commitToIssue,
+              metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType, 
instantRange);
+        }).collect(Collectors.toList()))
+        .flatMap(Collection::stream)
+        .collect(Collectors.toList());
+
+    for (MergeOnReadInputSplit split : inputSplits) {
+      context.collect(split);
+    }
+    // update the issues instant time
+    this.issuedInstant = commitToIssue;
+  }
+
+  @Override
+  public void close() throws Exception {
+    super.close();
+
+    if (checkpointLock != null) {
+      synchronized (checkpointLock) {
+        issuedInstant = null;
+        isRunning = false;
+      }
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Closed File Monitoring Source for path: " + path + ".");
+    }
+  }
+
+  @Override
+  public void cancel() {
+    if (checkpointLock != null) {
+      // this is to cover the case where cancel() is called before the run()
+      synchronized (checkpointLock) {
+        issuedInstant = null;
+        isRunning = false;
+      }
+    } else {
+      issuedInstant = null;
+      isRunning = false;
+    }
+  }
+
+  // -------------------------------------------------------------------------
+  //  Checkpointing
+  // -------------------------------------------------------------------------
+
+  @Override
+  public void snapshotState(FunctionSnapshotContext context) throws Exception {
+    this.instantState.clear();
+    if (this.issuedInstant != null) {
+      this.instantState.add(this.issuedInstant);
+    }
+  }
+
+  /**
+   * Returns the uncompacted instants with a given issuedInstant to start from.
+   *
+   * @param commitTimeline The completed commits timeline
+   * @param issuedInstant  The last issued instant that has already been 
delivered to downstream
+   * @return the filtered hoodie instants
+   */
+  private List<HoodieInstant> getUncompactedInstants(
+      HoodieTimeline commitTimeline,
+      final String issuedInstant) {
+    if (issuedInstant != null) {
+      return commitTimeline.getInstants()
+          .filter(s -> !s.getAction().equals(HoodieTimeline.COMPACTION_ACTION))
+          .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), 
GREATER_THAN, issuedInstant))
+          .collect(Collectors.toList());
+    } else if 
(this.conf.getOptional(FlinkOptions.READ_STREAMING_START_COMMIT).isPresent()) {
+      String definedStartCommit = 
this.conf.get(FlinkOptions.READ_STREAMING_START_COMMIT);
+      return commitTimeline.getInstants()
+          .filter(s -> !s.getAction().equals(HoodieTimeline.COMPACTION_ACTION))
+          .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), 
GREATER_THAN_OR_EQUALS, definedStartCommit))
+          .collect(Collectors.toList());
+    } else {
+      return commitTimeline.getInstants()
+          .filter(s -> !s.getAction().equals(HoodieTimeline.COMPACTION_ACTION))
+          .collect(Collectors.toList());
+    }
+  }
+
+  /**
+   * Returns all the incremental write partition paths as a set with the given 
commits metadata.
+   *
+   * @param metadataList The commits metadata
+   * @return the partition path set
+   */
+  private Set<String> getWritePartitionPaths(List<HoodieCommitMetadata> 
metadataList) {
+    return metadataList.stream()
+        .map(HoodieCommitMetadata::getWritePartitionPaths)
+        .flatMap(Collection::stream)
+        .collect(Collectors.toSet());
+  }
+
+  /**
+   * Returns all the incremental write file path statuses with the given 
commits metadata.
+   *
+   * @param metadataList The commits metadata
+   * @return the file statuses array
+   */
+  private FileStatus[] getWritePathsOfInstants(List<HoodieCommitMetadata> 
metadataList) {
+    FileSystem fs = FSUtils.getFs(path.getPath(), hadoopConf);
+    return metadataList.stream().map(metadata -> 
getWritePathsOfInstant(metadata, fs))
+        .flatMap(Collection::stream).toArray(FileStatus[]::new);
+  }
+
+  private List<FileStatus> getWritePathsOfInstant(HoodieCommitMetadata 
metadata, FileSystem fs) {
+    return metadata.getFileIdAndFullPaths(path.getPath()).values().stream()
+        .map(path -> {
+          try {
+            return fs.getFileStatus(new org.apache.hadoop.fs.Path(path));
+          } catch (IOException e) {
+            LOG.error("Get write status of path: {} error", path);
+            throw new HoodieException(e);
+          }
+        })
+        .collect(Collectors.toList());
+  }
+
+  private HoodieCommitMetadata getCommitMetadata(HoodieInstant instant, 
HoodieTimeline timeline) {
+    byte[] data = timeline.getInstantDetails(instant).get();
+    try {
+      return HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class);
+    } catch (IOException e) {
+      LOG.error("Get write metadata for table {} with instant {} and path: {} 
error",
+          conf.getString(FlinkOptions.TABLE_NAME), instant.getTimestamp(), 
path);
+      throw new HoodieException(e);
+    }
+  }
+}
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamReadOperator.java 
b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamReadOperator.java
new file mode 100644
index 0000000..d12147d
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamReadOperator.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.operator;
+
+import org.apache.hudi.source.format.mor.MergeOnReadInputFormat;
+import org.apache.hudi.source.format.mor.MergeOnReadInputSplit;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.runtime.state.JavaSerializer;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.MailboxExecutor;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.api.operators.StreamSourceContexts;
+import org.apache.flink.streaming.api.operators.YieldingOperatorFactory;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Queue;
+import java.util.concurrent.LinkedBlockingDeque;
+
+/**
+ * The operator that reads the {@link MergeOnReadInputSplit splits} received 
from the preceding {@link
+ * StreamReadMonitoringFunction}. Contrary to the {@link 
StreamReadMonitoringFunction} which has a parallelism of 1,
+ * this operator can have multiple parallelism.
+ *
+ * <p>As soon as an input split {@link MergeOnReadInputSplit} is received, it 
is put in a queue,
+ * the {@link MailboxExecutor} read the actual data of the split.
+ * This architecture allows the separation of split reading from processing 
the checkpoint barriers,
+ * thus removing any potential back-pressure.
+ */
+public class StreamReadOperator extends AbstractStreamOperator<RowData>
+    implements OneInputStreamOperator<MergeOnReadInputSplit, RowData> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(StreamReadOperator.class);
+
+  // It's the same thread that runs this operator and checkpoint actions. Use 
this executor to schedule only
+  // splits for subsequent reading, so that a new checkpoint could be 
triggered without blocking a long time
+  // for exhausting all scheduled split reading tasks.
+  private final MailboxExecutor executor;
+
+  private MergeOnReadInputFormat format;
+
+  private transient SourceFunction.SourceContext<RowData> sourceContext;
+
+  private transient ListState<MergeOnReadInputSplit> inputSplitsState;
+  private transient Queue<MergeOnReadInputSplit> splits;
+
+  // Splits are read by the same thread that calls #processElement. Each read 
task is submitted to that thread by adding
+  // them to the executor. This state is used to ensure that only one read 
task is in that splits queue at a time, so that
+  // read tasks do not accumulate ahead of checkpoint tasks. When there is a 
read task in the queue, this is set to RUNNING.
+  // When there are no more files to read, this will be set to IDLE.
+  private transient SplitState currentSplitState;
+
+  private StreamReadOperator(MergeOnReadInputFormat format, 
ProcessingTimeService timeService,
+                             MailboxExecutor mailboxExecutor) {
+    this.format = Preconditions.checkNotNull(format, "The InputFormat should 
not be null.");
+    this.processingTimeService = timeService;
+    this.executor = Preconditions.checkNotNull(mailboxExecutor, "The 
mailboxExecutor should not be null.");
+  }
+
+  @Override
+  public void initializeState(StateInitializationContext context) throws 
Exception {
+    super.initializeState(context);
+
+    // TODO Replace Java serialization with Avro approach to keep state 
compatibility.
+    inputSplitsState = context.getOperatorStateStore().getListState(
+        new ListStateDescriptor<>("splits", new JavaSerializer<>()));
+
+    // Initialize the current split state to IDLE.
+    currentSplitState = SplitState.IDLE;
+
+    // Recover splits state from flink state backend if possible.
+    splits = new LinkedBlockingDeque<>();
+    if (context.isRestored()) {
+      int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
+      LOG.info("Restoring state for operator {} (task ID: {}).", 
getClass().getSimpleName(), subtaskIdx);
+
+      for (MergeOnReadInputSplit split : inputSplitsState.get()) {
+        splits.add(split);
+      }
+    }
+
+    this.sourceContext = StreamSourceContexts.getSourceContext(
+        getOperatorConfig().getTimeCharacteristic(),
+        getProcessingTimeService(),
+        new Object(), // no actual locking needed
+        getContainingTask().getStreamStatusMaintainer(),
+        output,
+        getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval(),
+        -1);
+
+    // Enqueue to process the recovered input splits.
+    enqueueProcessSplits();
+  }
+
+  @Override
+  public void snapshotState(StateSnapshotContext context) throws Exception {
+    super.snapshotState(context);
+
+    inputSplitsState.clear();
+    inputSplitsState.addAll(new ArrayList<>(splits));
+  }
+
+  @Override
+  public void processElement(StreamRecord<MergeOnReadInputSplit> element) {
+    splits.add(element.getValue());
+    enqueueProcessSplits();
+  }
+
+  private void enqueueProcessSplits() {
+    if (currentSplitState == SplitState.IDLE && !splits.isEmpty()) {
+      currentSplitState = SplitState.RUNNING;
+      executor.execute(this::processSplits, this.getClass().getSimpleName());
+    }
+  }
+
+  private void processSplits() throws IOException {
+    MergeOnReadInputSplit split = splits.poll();
+    if (split == null) {
+      currentSplitState = SplitState.IDLE;
+      return;
+    }
+
+    format.open(split);
+    try {
+      RowData nextElement = null;
+      while (!format.reachedEnd()) {
+        nextElement = format.nextRecord(nextElement);
+        sourceContext.collect(nextElement);
+      }
+    } finally {
+      currentSplitState = SplitState.IDLE;
+      format.close();
+    }
+
+    // Re-schedule to process the next split.
+    enqueueProcessSplits();
+  }
+
+  @Override
+  public void processWatermark(Watermark mark) {
+    // we do nothing because we emit our own watermarks if needed.
+  }
+
+  @Override
+  public void dispose() throws Exception {
+    super.dispose();
+
+    if (format != null) {
+      format.close();
+      format.closeInputFormat();
+      format = null;
+    }
+
+    sourceContext = null;
+  }
+
+  @Override
+  public void close() throws Exception {
+    super.close();
+    output.close();
+    if (sourceContext != null) {
+      sourceContext.emitWatermark(Watermark.MAX_WATERMARK);
+      sourceContext.close();
+      sourceContext = null;
+    }
+  }
+
+  public static OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData> 
factory(MergeOnReadInputFormat format) {
+    return new OperatorFactory(format);
+  }
+
+  private enum SplitState {
+    IDLE, RUNNING
+  }
+
+  private static class OperatorFactory extends 
AbstractStreamOperatorFactory<RowData>
+      implements YieldingOperatorFactory<RowData>, 
OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData> {
+
+    private final MergeOnReadInputFormat format;
+
+    private transient MailboxExecutor mailboxExecutor;
+
+    private OperatorFactory(MergeOnReadInputFormat format) {
+      this.format = format;
+    }
+
+    @Override
+    public void setMailboxExecutor(MailboxExecutor mailboxExecutor) {
+      this.mailboxExecutor = mailboxExecutor;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <O extends StreamOperator<RowData>> O 
createStreamOperator(StreamOperatorParameters<RowData> parameters) {
+      StreamReadOperator operator = new StreamReadOperator(format, 
processingTimeService, mailboxExecutor);
+      operator.setup(parameters.getContainingTask(), 
parameters.getStreamConfig(), parameters.getOutput());
+      return (O) operator;
+    }
+
+    @Override
+    public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader 
classLoader) {
+      return StreamReadOperator.class;
+    }
+  }
+}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/HoodieTableSink.java 
b/hudi-flink/src/main/java/org/apache/hudi/sink/HoodieTableSink.java
index 5f979c7..3ba8381 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/HoodieTableSink.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/HoodieTableSink.java
@@ -65,7 +65,7 @@ public class HoodieTableSink implements 
AppendStreamTableSink<RowData>, Partitio
   public DataStreamSink<?> consumeDataStream(DataStream<RowData> dataStream) {
     // Read from kafka source
     RowType rowType = (RowType) 
this.schema.toRowDataType().notNull().getLogicalType();
-    int numWriteTasks = 
this.conf.getInteger(FlinkOptions.WRITE_TASK_PARALLELISM);
+    int numWriteTasks = this.conf.getInteger(FlinkOptions.WRITE_TASKS);
     StreamWriteOperatorFactory<HoodieRecord> operatorFactory = new 
StreamWriteOperatorFactory<>(conf, isBounded);
 
     DataStream<Object> pipeline = dataStream
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/source/HoodieTableSource.java 
b/hudi-flink/src/main/java/org/apache/hudi/source/HoodieTableSource.java
index 0bc219b..9ed753a 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/source/HoodieTableSource.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/source/HoodieTableSource.java
@@ -21,14 +21,18 @@ package org.apache.hudi.source;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.hadoop.HoodieROTablePathFilter;
 import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils;
 import org.apache.hudi.operator.FlinkOptions;
+import org.apache.hudi.operator.StreamReadMonitoringFunction;
+import org.apache.hudi.operator.StreamReadOperator;
 import org.apache.hudi.source.format.FilePathUtils;
 import org.apache.hudi.source.format.cow.CopyOnWriteInputFormat;
 import org.apache.hudi.source.format.mor.MergeOnReadInputFormat;
@@ -48,10 +52,11 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import 
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
 import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.expressions.Expression;
@@ -75,7 +80,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -148,6 +152,11 @@ public class HoodieTableSource implements
     this.hadoopConf = StreamerUtil.getHadoopConf();
     this.metaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
     this.maxCompactionMemoryInBytes = getMaxCompactionMemoryInBytes(new 
JobConf(this.hadoopConf));
+    if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING)) {
+      ValidationUtils.checkArgument(
+          
conf.getString(FlinkOptions.TABLE_TYPE).equalsIgnoreCase(FlinkOptions.TABLE_TYPE_MERGE_ON_READ),
+          "Streaming read is only supported for table type: " + 
FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
+    }
   }
 
   @Override
@@ -155,14 +164,29 @@ public class HoodieTableSource implements
     @SuppressWarnings("unchecked")
     TypeInformation<RowData> typeInfo =
         (TypeInformation<RowData>) 
TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(getProducedDataType());
-    InputFormatSourceFunction<RowData> func = new 
InputFormatSourceFunction<>(getInputFormat(), typeInfo);
-    DataStreamSource<RowData> source = execEnv.addSource(func, 
explainSource(), typeInfo);
-    return source.name(explainSource());
+    if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING)) {
+      StreamReadMonitoringFunction monitoringFunction = new 
StreamReadMonitoringFunction(
+          conf, path, metaClient, maxCompactionMemoryInBytes);
+      OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData> factory = 
StreamReadOperator.factory((MergeOnReadInputFormat) getInputFormat(true));
+      SingleOutputStreamOperator<RowData> source = 
execEnv.addSource(monitoringFunction, "streaming_source")
+          .setParallelism(1)
+          .uid("uid_streaming_source")
+          .transform("split_reader", typeInfo, factory)
+          .setParallelism(conf.getInteger(FlinkOptions.READ_TASKS))
+          .uid("uid_split_reader");
+      return new DataStreamSource<>(source);
+    } else {
+      InputFormatSourceFunction<RowData> func = new 
InputFormatSourceFunction<>(getInputFormat(), typeInfo);
+      DataStreamSource<RowData> source = execEnv.addSource(func, 
explainSource(), typeInfo);
+      return source.name("streaming_source")
+          .setParallelism(conf.getInteger(FlinkOptions.READ_TASKS))
+          .uid("uid_streaming_source");
+    }
   }
 
   @Override
   public boolean isBounded() {
-    return true;
+    return !conf.getBoolean(FlinkOptions.READ_AS_STREAMING);
   }
 
   @Override
@@ -189,24 +213,7 @@ public class HoodieTableSource implements
 
   @Override
   public List<Map<String, String>> getPartitions() {
-    try {
-      return FilePathUtils
-          .searchPartKeyValueAndPaths(
-              path.getFileSystem(),
-              path,
-              conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITION),
-              partitionKeys.toArray(new String[0]))
-          .stream()
-          .map(tuple2 -> tuple2.f0)
-          .map(spec -> {
-            LinkedHashMap<String, String> ret = new LinkedHashMap<>();
-            spec.forEach((k, v) -> ret.put(k, defaultPartName.equals(v) ? null 
: v));
-            return ret;
-          })
-          .collect(Collectors.toList());
-    } catch (Exception e) {
-      throw new TableException("Fetch partitions fail.", e);
-    }
+    return FilePathUtils.getPartitions(path, conf, partitionKeys, 
defaultPartName);
   }
 
   @Override
@@ -269,7 +276,7 @@ public class HoodieTableSource implements
             : Option.of(kv.getValue());
         return new MergeOnReadInputSplit(cnt.getAndAdd(1),
             baseFile.getPath(), logPaths, latestCommit,
-            metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType);
+            metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType, 
null);
       }).collect(Collectors.toList());
     } else {
       // all the files are logs
@@ -285,15 +292,19 @@ public class HoodieTableSource implements
                   .collect(Collectors.toList()));
               return new MergeOnReadInputSplit(cnt.getAndAdd(1),
                   null, logPaths, latestCommit,
-                  metaClient.getBasePath(), maxCompactionMemoryInBytes, 
mergeType);
+                  metaClient.getBasePath(), maxCompactionMemoryInBytes, 
mergeType, null);
             }).collect(Collectors.toList()); })
           .flatMap(Collection::stream)
           .collect(Collectors.toList());
     }
   }
 
-  @VisibleForTesting
   public InputFormat<RowData, ?> getInputFormat() {
+    return getInputFormat(false);
+  }
+
+  @VisibleForTesting
+  public InputFormat<RowData, ?> getInputFormat(boolean isStreaming) {
     // When this table has no partition, just return an empty source.
     if (!partitionKeys.isEmpty() && getOrFetchPartitions().isEmpty()) {
       return new CollectionInputFormat<>(Collections.emptyList(), null);
@@ -317,13 +328,20 @@ public class HoodieTableSource implements
 
     final String queryType = this.conf.getString(FlinkOptions.QUERY_TYPE);
     if (queryType.equals(FlinkOptions.QUERY_TYPE_SNAPSHOT)) {
-      switch (this.conf.getString(FlinkOptions.TABLE_TYPE)) {
-        case FlinkOptions.TABLE_TYPE_MERGE_ON_READ:
-          final List<MergeOnReadInputSplit> inputSplits = 
buildFileIndex(paths);
-          if (inputSplits.size() == 0) {
-            // When there is no input splits, just return an empty source.
-            LOG.warn("No input inputs generate for MERGE_ON_READ input format, 
returns empty collection instead");
-            return new CollectionInputFormat<>(Collections.emptyList(), null);
+      final HoodieTableType tableType = 
HoodieTableType.valueOf(this.conf.getString(FlinkOptions.TABLE_TYPE));
+      switch (tableType) {
+        case MERGE_ON_READ:
+          final List<MergeOnReadInputSplit> inputSplits;
+          if (!isStreaming) {
+            inputSplits = buildFileIndex(paths);
+            if (inputSplits.size() == 0) {
+              // When there is no input splits, just return an empty source.
+              LOG.warn("No input splits generate for MERGE_ON_READ input 
format, returns empty collection instead");
+              return new CollectionInputFormat<>(Collections.emptyList(), 
null);
+            }
+          } else {
+            // streaming reader would build the splits automatically.
+            inputSplits = Collections.emptyList();
           }
           final MergeOnReadTableState hoodieTableState = new 
MergeOnReadTableState(
               rowType,
@@ -335,10 +353,10 @@ public class HoodieTableSource implements
               this.conf,
               paths,
               hoodieTableState,
-              rowDataType.getChildren(), // use the explicit fields data type 
because the AvroSchemaConvertr is not very stable.
+              rowDataType.getChildren(), // use the explicit fields data type 
because the AvroSchemaConverter is not very stable.
               "default",
               this.limit);
-        case FlinkOptions.TABLE_TYPE_COPY_ON_WRITE:
+        case COPY_ON_WRITE:
           FileInputFormat<RowData> format = new CopyOnWriteInputFormat(
               paths,
               this.schema.getFieldNames(),
@@ -373,27 +391,9 @@ public class HoodieTableSource implements
    */
   @VisibleForTesting
   public Path[] getReadPaths() {
-    if (partitionKeys.isEmpty()) {
-      return new Path[] {path};
-    } else {
-      return getOrFetchPartitions().stream()
-          .map(HoodieTableSource.this::validateAndReorderPartitions)
-          .map(kvs -> FilePathUtils.generatePartitionPath(kvs, 
conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITION)))
-          .map(n -> new Path(path, n))
-          .toArray(Path[]::new);
-    }
-  }
-
-  private LinkedHashMap<String, String> 
validateAndReorderPartitions(Map<String, String> part) {
-    LinkedHashMap<String, String> map = new LinkedHashMap<>();
-    for (String k : partitionKeys) {
-      if (!part.containsKey(k)) {
-        throw new TableException("Partition keys are: " + partitionKeys
-            + ", incomplete partition spec: " + part);
-      }
-      map.put(k, part.get(k));
-    }
-    return map;
+    return partitionKeys.isEmpty()
+        ? new Path[] {path}
+        : FilePathUtils.partitionPath2ReadPath(path, conf, partitionKeys, 
getOrFetchPartitions());
   }
 
   private static class LatestFileFilter extends FilePathFilter {
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/source/format/FilePathUtils.java 
b/hudi-flink/src/main/java/org/apache/hudi/source/format/FilePathUtils.java
index b025d9a..03bf53d 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/source/format/FilePathUtils.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/source/format/FilePathUtils.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.source.format;
 
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.operator.FlinkOptions;
 
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.FileStatus;
@@ -35,6 +36,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 
 /**
  * Reference the Flink {@link org.apache.flink.table.utils.PartitionPathUtils}
@@ -317,4 +319,110 @@ public class FilePathUtils {
     // the log files is hidden file
     return name.startsWith("_") || name.startsWith(".") && 
!name.contains(".log.");
   }
+
+  /**
+   * Returns the partition path key and values as a list of map, each map item 
in the list
+   * is a mapping of the partition key name to its actual partition value. For 
example, say
+   * there is a file path with partition keys [key1, key2, key3]:
+   *
+   * <p><pre>
+   *   -- file:/// ... key1=val1/key2=val2/key3=val3
+   *   -- file:/// ... key1=val4/key2=val5/key3=val6
+   * </pre>
+   *
+   * <p>The return list should be [{key1:val1, key2:val2, key3:val3}, 
{key1:val4, key2:val5, key3:val6}].
+   *
+   * @param path The base path
+   * @param conf The configuration
+   * @param partitionKeys The partition key list
+   * @param defaultParName The default partition name for nulls
+   */
+  public static List<Map<String, String>> getPartitions(
+      Path path,
+      org.apache.flink.configuration.Configuration conf,
+      List<String> partitionKeys,
+      String defaultParName) {
+    try {
+      return FilePathUtils
+          .searchPartKeyValueAndPaths(
+              path.getFileSystem(),
+              path,
+              conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITION),
+              partitionKeys.toArray(new String[0]))
+          .stream()
+          .map(tuple2 -> tuple2.f0)
+          .map(spec -> {
+            LinkedHashMap<String, String> ret = new LinkedHashMap<>();
+            spec.forEach((k, v) -> ret.put(k, defaultParName.equals(v) ? null 
: v));
+            return ret;
+          })
+          .collect(Collectors.toList());
+    } catch (Exception e) {
+      throw new TableException("Fetch partitions fail.", e);
+    }
+  }
+
+  /**
+   * Reorder the partition key value mapping based on the given partition keys 
sequence.
+   *
+   * @param partitionKVs  The partition key and value mapping
+   * @param partitionKeys The partition key list
+   */
+  public static LinkedHashMap<String, String> validateAndReorderPartitions(
+      Map<String, String> partitionKVs,
+      List<String> partitionKeys) {
+    LinkedHashMap<String, String> map = new LinkedHashMap<>();
+    for (String k : partitionKeys) {
+      if (!partitionKVs.containsKey(k)) {
+        throw new TableException("Partition keys are: " + partitionKeys
+            + ", incomplete partition spec: " + partitionKVs);
+      }
+      map.put(k, partitionKVs.get(k));
+    }
+    return map;
+  }
+
+  /**
+   * Returns all the file paths that is the parents of the data files.
+   *
+   * @param path The base path
+   * @param conf The configuration
+   * @param partitionKeys The partition key list
+   * @param defaultParName The default partition name for nulls
+   */
+  public static Path[] getReadPaths(
+      Path path,
+      org.apache.flink.configuration.Configuration conf,
+      List<String> partitionKeys,
+      String defaultParName) {
+    if (partitionKeys.isEmpty()) {
+      return new Path[] {path};
+    } else {
+      List<Map<String, String>> partitionPaths =
+          getPartitions(path, conf, partitionKeys, defaultParName);
+      return partitionPath2ReadPath(path, conf, partitionKeys, partitionPaths);
+    }
+  }
+
+  /**
+   * Transforms the given partition key value mapping to read paths.
+   *
+   * @param path The base path
+   * @param conf The hadoop configuration
+   * @param partitionKeys The partition key list
+   * @param partitionPaths The partition key value mapping
+   *
+   * @see #getReadPaths
+   */
+  public static Path[] partitionPath2ReadPath(
+      Path path,
+      org.apache.flink.configuration.Configuration conf,
+      List<String> partitionKeys,
+      List<Map<String, String>> partitionPaths) {
+    return partitionPaths.stream()
+        .map(m -> validateAndReorderPartitions(m, partitionKeys))
+        .map(kvs -> FilePathUtils.generatePartitionPath(kvs, 
conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITION)))
+        .map(n -> new Path(path, n))
+        .toArray(Path[]::new);
+  }
 }
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/source/format/mor/InstantRange.java 
b/hudi-flink/src/main/java/org/apache/hudi/source/format/mor/InstantRange.java
new file mode 100644
index 0000000..62f34db
--- /dev/null
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/source/format/mor/InstantRange.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.format.mor;
+
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * A instant commits range used for incremental reader filtering.
+ */
+public abstract class InstantRange implements Serializable {
+  private static final long serialVersionUID = 1L;
+
+  protected final String startInstant;
+  protected final String endInstant;
+
+  public InstantRange(String startInstant, String endInstant) {
+    this.startInstant = Objects.requireNonNull(startInstant);
+    this.endInstant = Objects.requireNonNull(endInstant);
+  }
+
+  public static InstantRange getInstance(String startInstant, String 
endInstant, RangeType rangeType) {
+    switch (rangeType) {
+      case OPEN_CLOSE:
+        return new OpenCloseRange(startInstant, endInstant);
+      case CLOSE_CLOSE:
+        return new CloseCloseRange(startInstant, endInstant);
+      default:
+        throw new AssertionError();
+    }
+  }
+
+  public String getStartInstant() {
+    return startInstant;
+  }
+
+  public String getEndInstant() {
+    return endInstant;
+  }
+
+  public abstract boolean isInRange(String instant);
+
+  // -------------------------------------------------------------------------
+  //  Inner Class
+  // -------------------------------------------------------------------------
+
+  /**
+   * Represents a range type.
+   */
+  public enum RangeType {
+    OPEN_CLOSE, CLOSE_CLOSE;
+  }
+
+  private static class OpenCloseRange extends InstantRange {
+
+    public OpenCloseRange(String startInstant, String endInstant) {
+      super(startInstant, endInstant);
+    }
+
+    @Override
+    public boolean isInRange(String instant) {
+      // No need to do comparison:
+      // HoodieTimeline.compareTimestamps(instant, 
HoodieTimeline.LESSER_THAN_OR_EQUALS, endInstant)
+      // because the logic is ensured by the log scanner
+      return HoodieTimeline.compareTimestamps(instant, 
HoodieTimeline.GREATER_THAN, startInstant);
+    }
+  }
+
+  private static class CloseCloseRange extends InstantRange {
+
+    public CloseCloseRange(String startInstant, String endInstant) {
+      super(startInstant, endInstant);
+    }
+
+    @Override
+    public boolean isInRange(String instant) {
+      // No need to do comparison:
+      // HoodieTimeline.compareTimestamps(instant, 
HoodieTimeline.LESSER_THAN_OR_EQUALS, endInstant)
+      // because the logic is ensured by the log scanner
+      return HoodieTimeline.compareTimestamps(instant, 
HoodieTimeline.GREATER_THAN_OR_EQUALS, startInstant);
+    }
+  }
+}
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/source/format/mor/MergeOnReadInputFormat.java
 
b/hudi-flink/src/main/java/org/apache/hudi/source/format/mor/MergeOnReadInputFormat.java
index ebd91af..510b5b5 100644
--- 
a/hudi-flink/src/main/java/org/apache/hudi/source/format/mor/MergeOnReadInputFormat.java
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/source/format/mor/MergeOnReadInputFormat.java
@@ -287,6 +287,17 @@ public class MergeOnReadInputFormat
             // delete record found, skipping
             return hasNext();
           } else {
+            // should improve the code when log scanner supports
+            // seeking by log blocks with commit time which is more
+            // efficient.
+            if (split.getInstantRange().isPresent()) {
+              // based on the fact that commit time is always the first field
+              String commitTime = curAvroRecord.get().get(0).toString();
+              if (!split.getInstantRange().get().isInRange(commitTime)) {
+                // filter out the records that are not in range
+                return hasNext();
+              }
+            }
             GenericRecord requiredAvroRecord = buildAvroRecordBySchema(
                 curAvroRecord.get(),
                 requiredSchema,
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/source/format/mor/MergeOnReadInputSplit.java
 
b/hudi-flink/src/main/java/org/apache/hudi/source/format/mor/MergeOnReadInputSplit.java
index 5cf0aff..a73e93a 100644
--- 
a/hudi-flink/src/main/java/org/apache/hudi/source/format/mor/MergeOnReadInputSplit.java
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/source/format/mor/MergeOnReadInputSplit.java
@@ -39,6 +39,7 @@ public class MergeOnReadInputSplit implements InputSplit {
   private final String tablePath;
   private final long maxCompactionMemoryInBytes;
   private final String mergeType;
+  private final Option<InstantRange> instantRange;
 
   public MergeOnReadInputSplit(
       int splitNum,
@@ -47,7 +48,8 @@ public class MergeOnReadInputSplit implements InputSplit {
       String latestCommit,
       String tablePath,
       long maxCompactionMemoryInBytes,
-      String mergeType) {
+      String mergeType,
+      @Nullable InstantRange instantRange) {
     this.splitNum = splitNum;
     this.basePath = Option.ofNullable(basePath);
     this.logPaths = logPaths;
@@ -55,6 +57,7 @@ public class MergeOnReadInputSplit implements InputSplit {
     this.tablePath = tablePath;
     this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
     this.mergeType = mergeType;
+    this.instantRange = Option.ofNullable(instantRange);
   }
 
   public Option<String> getBasePath() {
@@ -81,6 +84,10 @@ public class MergeOnReadInputSplit implements InputSplit {
     return mergeType;
   }
 
+  public Option<InstantRange> getInstantRange() {
+    return this.instantRange;
+  }
+
   @Override
   public int getSplitNumber() {
     return this.splitNum;
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java 
b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
index d110bff..29f5de8 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
@@ -71,7 +71,7 @@ public class HoodieFlinkStreamer {
     }
 
     Configuration conf = FlinkOptions.fromStreamerConfig(cfg);
-    int numWriteTask = conf.getInteger(FlinkOptions.WRITE_TASK_PARALLELISM);
+    int numWriteTask = conf.getInteger(FlinkOptions.WRITE_TASKS);
 
     TypedProperties props = StreamerUtil.appendKafkaProps(cfg);
 
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamerV2.java 
b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamerV2.java
index 27fd4f5..d217e08 100644
--- 
a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamerV2.java
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamerV2.java
@@ -72,7 +72,7 @@ public class HoodieFlinkStreamerV2 {
         (RowType) 
AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(cfg))
             .getLogicalType();
     Configuration conf = FlinkOptions.fromStreamerConfig(cfg);
-    int numWriteTask = conf.getInteger(FlinkOptions.WRITE_TASK_PARALLELISM);
+    int numWriteTask = conf.getInteger(FlinkOptions.WRITE_TASKS);
     StreamWriteOperatorFactory<HoodieRecord> operatorFactory =
         new StreamWriteOperatorFactory<>(conf);
 
diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java 
b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index fdab92b..830c23a 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -18,27 +18,26 @@
 
 package org.apache.hudi.util;
 
-import org.apache.hudi.common.model.HoodieRecordLocation;
-import org.apache.hudi.common.model.HoodieTableType;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.util.TablePathUtils;
-import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.exception.TableNotFoundException;
-import org.apache.hudi.keygen.SimpleAvroKeyGenerator;
-import org.apache.hudi.streamer.FlinkStreamerConfig;
 import org.apache.hudi.common.config.DFSPropertiesConfiguration;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.EngineType;
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.TablePathUtils;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.TableNotFoundException;
 import org.apache.hudi.keygen.KeyGenerator;
+import org.apache.hudi.keygen.SimpleAvroKeyGenerator;
 import org.apache.hudi.operator.FlinkOptions;
 import org.apache.hudi.schema.FilebasedSchemaProvider;
+import org.apache.hudi.streamer.FlinkStreamerConfig;
 import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
 
 import org.apache.avro.Schema;
@@ -332,7 +331,7 @@ public class StreamerUtil {
   public static boolean needsScheduleCompaction(Configuration conf) {
     return conf.getString(FlinkOptions.TABLE_TYPE)
         .toUpperCase(Locale.ROOT)
-        .equals(HoodieTableType.MERGE_ON_READ.name())
+        .equals(FlinkOptions.TABLE_TYPE_MERGE_ON_READ)
         && conf.getBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED);
   }
 }
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestConfigurations.java
 
b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestConfigurations.java
index c28383e..c38b715 100644
--- 
a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestConfigurations.java
+++ 
b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestConfigurations.java
@@ -20,15 +20,19 @@ package org.apache.hudi.operator.utils;
 
 import org.apache.hudi.operator.FlinkOptions;
 import org.apache.hudi.streamer.FlinkStreamerConfig;
+import org.apache.hudi.utils.factory.CollectSinkTableFactory;
 import org.apache.hudi.utils.factory.ContinuousFileSourceFactory;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.Row;
 
 import java.util.Map;
 import java.util.Objects;
@@ -56,6 +60,13 @@ public class TestConfigurations {
           ROW_DATA_TYPE.getChildren().toArray(new DataType[0]))
       .build();
 
+  public static final TypeInformation<Row> ROW_TYPE_INFO = Types.ROW(
+      Types.STRING,
+      Types.STRING,
+      Types.INT,
+      Types.LOCAL_DATE_TIME,
+      Types.STRING);
+
   public static String getCreateHoodieTableDDL(String tableName, Map<String, 
String> options) {
     String createTable = "create table " + tableName + "(\n"
         + "  uuid varchar(20),\n"
@@ -77,8 +88,12 @@ public class TestConfigurations {
   }
 
   public static String getFileSourceDDL(String tableName) {
+    return getFileSourceDDL(tableName, "test_source.data");
+  }
+
+  public static String getFileSourceDDL(String tableName, String fileName) {
     String sourcePath = Objects.requireNonNull(Thread.currentThread()
-        .getContextClassLoader().getResource("test_source.data")).toString();
+        .getContextClassLoader().getResource(fileName)).toString();
     return "create table " + tableName + "(\n"
         + "  uuid varchar(20),\n"
         + "  name varchar(10),\n"
@@ -91,6 +106,18 @@ public class TestConfigurations {
         + ")";
   }
 
+  public static String getCollectSinkDDL(String tableName) {
+    return "create table " + tableName + "(\n"
+        + "  uuid varchar(20),\n"
+        + "  name varchar(10),\n"
+        + "  age int,\n"
+        + "  ts timestamp(3),\n"
+        + "  `partition` varchar(20)\n"
+        + ") with (\n"
+        + "  'connector' = '" + CollectSinkTableFactory.FACTORY_ID + "'"
+        + ")";
+  }
+
   public static final RowDataSerializer SERIALIZER = new RowDataSerializer(new 
ExecutionConfig(), ROW_TYPE);
 
   public static Configuration getDefaultConf(String tablePath) {
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestData.java 
b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestData.java
index 9e671e6..efeb0dd 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestData.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestData.java
@@ -43,6 +43,7 @@ import org.apache.flink.table.data.writer.BinaryWriter;
 import org.apache.flink.table.runtime.types.InternalSerializers;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.Row;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.parquet.Strings;
@@ -117,6 +118,52 @@ public class TestData {
             TimestampData.fromEpochMillis(1), StringData.fromString("par1"))));
   }
 
+  // data set of test_source.data
+  public static List<RowData> DATA_SET_FOUR = Arrays.asList(
+      binaryRow(StringData.fromString("id1"), StringData.fromString("Danny"), 
23,
+          TimestampData.fromEpochMillis(1000), StringData.fromString("par1")),
+      binaryRow(StringData.fromString("id2"), 
StringData.fromString("Stephen"), 33,
+          TimestampData.fromEpochMillis(2000), StringData.fromString("par1")),
+      binaryRow(StringData.fromString("id3"), StringData.fromString("Julian"), 
53,
+          TimestampData.fromEpochMillis(3000), StringData.fromString("par2")),
+      binaryRow(StringData.fromString("id4"), StringData.fromString("Fabian"), 
31,
+          TimestampData.fromEpochMillis(4000), StringData.fromString("par2")),
+      binaryRow(StringData.fromString("id5"), StringData.fromString("Sophia"), 
18,
+          TimestampData.fromEpochMillis(5000), StringData.fromString("par3")),
+      binaryRow(StringData.fromString("id6"), StringData.fromString("Emma"), 
20,
+          TimestampData.fromEpochMillis(6000), StringData.fromString("par3")),
+      binaryRow(StringData.fromString("id7"), StringData.fromString("Bob"), 44,
+          TimestampData.fromEpochMillis(7000), StringData.fromString("par4")),
+      binaryRow(StringData.fromString("id8"), StringData.fromString("Han"), 56,
+          TimestampData.fromEpochMillis(8000), StringData.fromString("par4"))
+  );
+
+  // merged data set of test_source.data and test_source2.data
+  public static List<RowData> DATA_SET_FIVE = Arrays.asList(
+      binaryRow(StringData.fromString("id1"), StringData.fromString("Danny"), 
24,
+          TimestampData.fromEpochMillis(1000), StringData.fromString("par1")),
+      binaryRow(StringData.fromString("id2"), 
StringData.fromString("Stephen"), 34,
+          TimestampData.fromEpochMillis(2000), StringData.fromString("par1")),
+      binaryRow(StringData.fromString("id3"), StringData.fromString("Julian"), 
54,
+          TimestampData.fromEpochMillis(3000), StringData.fromString("par2")),
+      binaryRow(StringData.fromString("id4"), StringData.fromString("Fabian"), 
32,
+          TimestampData.fromEpochMillis(4000), StringData.fromString("par2")),
+      binaryRow(StringData.fromString("id5"), StringData.fromString("Sophia"), 
18,
+          TimestampData.fromEpochMillis(5000), StringData.fromString("par3")),
+      binaryRow(StringData.fromString("id6"), StringData.fromString("Emma"), 
20,
+          TimestampData.fromEpochMillis(6000), StringData.fromString("par3")),
+      binaryRow(StringData.fromString("id7"), StringData.fromString("Bob"), 44,
+          TimestampData.fromEpochMillis(7000), StringData.fromString("par4")),
+      binaryRow(StringData.fromString("id8"), StringData.fromString("Han"), 56,
+          TimestampData.fromEpochMillis(8000), StringData.fromString("par4")),
+      binaryRow(StringData.fromString("id9"), StringData.fromString("Jane"), 
19,
+          TimestampData.fromEpochMillis(6000), StringData.fromString("par3")),
+      binaryRow(StringData.fromString("id10"), StringData.fromString("Ella"), 
38,
+          TimestampData.fromEpochMillis(7000), StringData.fromString("par4")),
+      binaryRow(StringData.fromString("id11"), 
StringData.fromString("Phoebe"), 52,
+          TimestampData.fromEpochMillis(8000), StringData.fromString("par4"))
+  );
+
   /**
    * Returns string format of a list of RowData.
    */
@@ -159,24 +206,78 @@ public class TestData {
   }
 
   /**
-   * Checks the source data TestConfigurations.DATA_SET_ONE are written as 
expected.
+   * Sort the {@code rows} using field at index 0 and asserts
+   * it equals with the expected string {@code expected}.
+   *
+   * @param rows     Actual result rows
+   * @param expected Expected string of the sorted rows
+   */
+  public static void assertRowsEquals(List<Row> rows, String expected) {
+    String rowsString = rows.stream()
+        .sorted(Comparator.comparing(o -> o.getField(0).toString()))
+        .collect(Collectors.toList()).toString();
+    assertThat(rowsString, is(expected));
+  }
+
+  /**
+   * Sort the {@code rows} using field at index 0 and asserts
+   * it equals with the expected row data list {@code expected}.
+   *
+   * @param rows     Actual result rows
+   * @param expected Expected row data list
+   */
+  public static void assertRowsEquals(List<Row> rows, List<RowData> expected) {
+    String rowsString = rows.stream()
+        .sorted(Comparator.comparing(o -> o.getField(0).toString()))
+        .collect(Collectors.toList()).toString();
+    assertThat(rowsString, is(rowDataToString(expected)));
+  }
+
+  /**
+   * Sort the {@code rows} using field at index 0 and asserts
+   * it equals with the expected string {@code expected}.
+   *
+   * @param rows     Actual result rows
+   * @param expected Expected string of the sorted rows
+   */
+  public static void assertRowDataEquals(List<RowData> rows, String expected) {
+    String rowsString = rowDataToString(rows);
+    assertThat(rowsString, is(expected));
+  }
+
+  /**
+   * Sort the {@code rows} using field at index 0 and asserts
+   * it equals with the expected row data list {@code expected}.
+   *
+   * @param rows     Actual result rows
+   * @param expected Expected row data list
+   */
+  public static void assertRowDataEquals(List<RowData> rows, List<RowData> 
expected) {
+    String rowsString = rowDataToString(rows);
+    assertThat(rowsString, is(rowDataToString(expected)));
+  }
+
+  /**
+   * Checks the source data set are written as expected.
    *
    * <p>Note: Replace it with the Flink reader when it is supported.
    *
    * @param baseFile The file base to check, should be a directory
    * @param expected The expected results mapping, the key should be the 
partition path
+   *                 and value should be values list with the key partition
    */
   public static void checkWrittenData(File baseFile, Map<String, String> 
expected) throws IOException {
     checkWrittenData(baseFile, expected, 4);
   }
 
   /**
-   * Checks the source data TestConfigurations.DATA_SET_ONE are written as 
expected.
+   * Checks the source data set are written as expected.
    *
    * <p>Note: Replace it with the Flink reader when it is supported.
    *
    * @param baseFile   The file base to check, should be a directory
    * @param expected   The expected results mapping, the key should be the 
partition path
+   *                   and value should be values list with the key partition
    * @param partitions The expected partition number
    */
   public static void checkWrittenData(
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/source/HoodieDataSourceITCase.java 
b/hudi-flink/src/test/java/org/apache/hudi/source/HoodieDataSourceITCase.java
index 0879545..ca110db 100644
--- 
a/hudi-flink/src/test/java/org/apache/hudi/source/HoodieDataSourceITCase.java
+++ 
b/hudi-flink/src/test/java/org/apache/hudi/source/HoodieDataSourceITCase.java
@@ -20,7 +20,11 @@ package org.apache.hudi.source;
 
 import org.apache.hudi.operator.FlinkOptions;
 import org.apache.hudi.operator.utils.TestConfigurations;
+import org.apache.hudi.operator.utils.TestData;
+import org.apache.hudi.utils.TestUtils;
+import org.apache.hudi.utils.factory.CollectSinkTableFactory;
 
+import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableResult;
@@ -34,16 +38,16 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
 import java.io.File;
-import java.util.Comparator;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
+import static org.apache.hudi.operator.utils.TestData.assertRowsEquals;
 
 /**
  * IT cases for Hoodie table source and sink.
@@ -73,6 +77,68 @@ public class HoodieDataSourceITCase extends AbstractTestBase 
{
   File tempFile;
 
   @Test
+  void testStreamWriteAndRead() throws Exception {
+    // create filesystem table named source
+    String createSource = TestConfigurations.getFileSourceDDL("source");
+    streamTableEnv.executeSql(createSource);
+
+    Map<String, String> options = new HashMap<>();
+    options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
+    options.put(FlinkOptions.READ_SCHEMA_FILE_PATH.key(),
+        Objects.requireNonNull(Thread.currentThread()
+            
.getContextClassLoader().getResource("test_read_schema.avsc")).toString());
+    options.put(FlinkOptions.READ_AS_STREAMING.key(), "true");
+    options.put(FlinkOptions.TABLE_TYPE.key(), 
FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
+    String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", 
options);
+    streamTableEnv.executeSql(hoodieTableDDL);
+    String insertInto = "insert into t1 select * from source";
+    execInsertSql(streamTableEnv, insertInto);
+
+    List<Row> rows = execSelectSql(streamTableEnv, "select * from t1", 10);
+    assertRowsEquals(rows, TestData.DATA_SET_FOUR);
+
+    // insert another batch of data
+    execInsertSql(streamTableEnv, insertInto);
+    List<Row> rows2 = execSelectSql(streamTableEnv, "select * from t1", 10);
+    assertRowsEquals(rows2, TestData.DATA_SET_FOUR);
+  }
+
+  @Test
+  void testStreamReadAppendData() throws Exception {
+    // create filesystem table named source
+    String createSource = TestConfigurations.getFileSourceDDL("source");
+    String createSource2 = TestConfigurations.getFileSourceDDL("source2", 
"test_source2.data");
+    streamTableEnv.executeSql(createSource);
+    streamTableEnv.executeSql(createSource2);
+
+    Map<String, String> options = new HashMap<>();
+    options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
+    options.put(FlinkOptions.READ_SCHEMA_FILE_PATH.key(),
+        Objects.requireNonNull(Thread.currentThread()
+            
.getContextClassLoader().getResource("test_read_schema.avsc")).toString());
+    options.put(FlinkOptions.READ_AS_STREAMING.key(), "true");
+    options.put(FlinkOptions.TABLE_TYPE.key(), 
FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
+    String createHoodieTable = 
TestConfigurations.getCreateHoodieTableDDL("t1", options);
+    streamTableEnv.executeSql(createHoodieTable);
+    String insertInto = "insert into t1 select * from source";
+    // execute 2 times
+    execInsertSql(streamTableEnv, insertInto);
+    // remember the commit
+    String specifiedCommit = 
TestUtils.getFirstCommit(tempFile.getAbsolutePath());
+    // another update batch
+    String insertInto2 = "insert into t1 select * from source2";
+    execInsertSql(streamTableEnv, insertInto2);
+    // now we consume starting from the oldest commit
+    options.put(FlinkOptions.READ_STREAMING_START_COMMIT.key(), 
specifiedCommit);
+    String createHoodieTable2 = 
TestConfigurations.getCreateHoodieTableDDL("t2", options);
+    streamTableEnv.executeSql(createHoodieTable2);
+    List<Row> rows = execSelectSql(streamTableEnv, "select * from t2", 10);
+    // all the data with same keys are appended within one data bucket and one 
log file,
+    // so when consume, the same keys are merged
+    assertRowsEquals(rows, TestData.DATA_SET_FIVE);
+  }
+
+  @Test
   void testStreamWriteBatchRead() {
     // create filesystem table named source
     String createSource = TestConfigurations.getFileSourceDDL("source");
@@ -90,15 +156,7 @@ public class HoodieDataSourceITCase extends 
AbstractTestBase {
 
     List<Row> rows = CollectionUtil.iterableToList(
         () -> streamTableEnv.sqlQuery("select * from t1").execute().collect());
-    final String expected = "[id1,Danny,23,1970-01-01T00:00:01,par1, "
-        + "id2,Stephen,33,1970-01-01T00:00:02,par1, "
-        + "id3,Julian,53,1970-01-01T00:00:03,par2, "
-        + "id4,Fabian,31,1970-01-01T00:00:04,par2, "
-        + "id5,Sophia,18,1970-01-01T00:00:05,par3, "
-        + "id6,Emma,20,1970-01-01T00:00:06,par3, "
-        + "id7,Bob,44,1970-01-01T00:00:07,par4, "
-        + "id8,Han,56,1970-01-01T00:00:08,par4]";
-    assertRowsEquals(rows, expected);
+    assertRowsEquals(rows, TestData.DATA_SET_FOUR);
   }
 
   @Test
@@ -124,29 +182,7 @@ public class HoodieDataSourceITCase extends 
AbstractTestBase {
 
     List<Row> rows = CollectionUtil.iterableToList(
         () -> batchTableEnv.sqlQuery("select * from t1").execute().collect());
-    final String expected = "[id1,Danny,23,1970-01-01T00:00:01,par1, "
-        + "id2,Stephen,33,1970-01-01T00:00:02,par1, "
-        + "id3,Julian,53,1970-01-01T00:00:03,par2, "
-        + "id4,Fabian,31,1970-01-01T00:00:04,par2, "
-        + "id5,Sophia,18,1970-01-01T00:00:05,par3, "
-        + "id6,Emma,20,1970-01-01T00:00:06,par3, "
-        + "id7,Bob,44,1970-01-01T00:00:07,par4, "
-        + "id8,Han,56,1970-01-01T00:00:08,par4]";
-    assertRowsEquals(rows, expected);
-  }
-
-  /**
-   * Sort the {@code rows} using field at index 0 and asserts
-   * it equals with the expected string {@code expected}.
-   *
-   * @param rows     Actual result rows
-   * @param expected Expected string of the sorted rows
-   */
-  private static void assertRowsEquals(List<Row> rows, String expected) {
-    String rowsString = rows.stream()
-        .sorted(Comparator.comparing(o -> o.getField(0).toString()))
-        .collect(Collectors.toList()).toString();
-    assertThat(rowsString, is(expected));
+    assertRowsEquals(rows, TestData.DATA_SET_FOUR);
   }
 
   private void execInsertSql(TableEnvironment tEnv, String insert) {
@@ -159,4 +195,16 @@ public class HoodieDataSourceITCase extends 
AbstractTestBase {
       throw new RuntimeException(ex);
     }
   }
+
+  private List<Row> execSelectSql(TableEnvironment tEnv, String select, long 
timeout) throws InterruptedException {
+    tEnv.executeSql(TestConfigurations.getCollectSinkDDL("sink"));
+    TableResult tableResult = tEnv.executeSql("insert into sink " + select);
+    // wait for the timeout then cancels the job
+    TimeUnit.SECONDS.sleep(timeout);
+    tableResult.getJobClient().ifPresent(JobClient::cancel);
+    tEnv.executeSql("DROP TABLE IF EXISTS sink");
+    return CollectSinkTableFactory.RESULT.values().stream()
+        .flatMap(Collection::stream)
+        .collect(Collectors.toList());
+  }
 }
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/source/HoodieTableSourceTest.java 
b/hudi-flink/src/test/java/org/apache/hudi/source/TestHoodieTableSource.java
similarity index 97%
rename from 
hudi-flink/src/test/java/org/apache/hudi/source/HoodieTableSourceTest.java
rename to 
hudi-flink/src/test/java/org/apache/hudi/source/TestHoodieTableSource.java
index f799457..af50cf0 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/source/HoodieTableSourceTest.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/source/TestHoodieTableSource.java
@@ -54,8 +54,8 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
 /**
  * Test cases for HoodieTableSource.
  */
-public class HoodieTableSourceTest {
-  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieTableSourceTest.class);
+public class TestHoodieTableSource {
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestHoodieTableSource.class);
 
   private Configuration conf;
 
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
 
b/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
new file mode 100644
index 0000000..f02a28c
--- /dev/null
+++ 
b/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source;
+
+import org.apache.hudi.operator.FlinkOptions;
+import org.apache.hudi.operator.StreamReadMonitoringFunction;
+import org.apache.hudi.operator.utils.TestConfigurations;
+import org.apache.hudi.operator.utils.TestData;
+import org.apache.hudi.source.format.mor.MergeOnReadInputSplit;
+import org.apache.hudi.util.StreamerUtil;
+import org.apache.hudi.utils.TestUtils;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.StreamSource;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test cases for {@link StreamReadMonitoringFunction}.
+ */
+public class TestStreamReadMonitoringFunction {
+  private static final long WAIT_TIME_MILLIS = 5 * 1000L;
+
+  private Configuration conf;
+
+  @TempDir
+  File tempFile;
+
+  @BeforeEach
+  public void before() throws Exception {
+    final String basePath = tempFile.getAbsolutePath();
+    conf = TestConfigurations.getDefaultConf(basePath);
+    conf.setString(FlinkOptions.TABLE_TYPE, 
FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
+    conf.setInteger(FlinkOptions.READ_STREAMING_CHECK_INTERVAL, 2); // check 
every 2 seconds
+
+    StreamerUtil.initTableIfNotExists(conf);
+  }
+
+  @Test
+  public void testConsumeFromLatestCommit() throws Exception {
+    TestData.writeData(TestData.DATA_SET_ONE, conf);
+    StreamReadMonitoringFunction function = TestUtils.getMonitorFunc(conf);
+    try (AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> harness = 
createHarness(function)) {
+      harness.setup();
+      harness.open();
+
+      CountDownLatch latch = new CountDownLatch(4);
+      CollectingSourceContext sourceContext = new 
CollectingSourceContext(latch);
+
+      runAsync(sourceContext, function);
+
+      assertTrue(latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should 
finish splits generation");
+      assertThat("Should produce the expected splits",
+          sourceContext.getPartitionPaths(), is("par1,par2,par3,par4"));
+      assertTrue(sourceContext.splits.stream().noneMatch(split -> 
split.getInstantRange().isPresent()),
+          "No instants should have range limit");
+
+      Thread.sleep(1000L);
+
+      // reset the source context
+      latch = new CountDownLatch(4);
+      sourceContext.reset(latch);
+
+      // write another instant and validate
+      TestData.writeData(TestData.DATA_SET_TWO, conf);
+
+      assertTrue(latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should 
finish splits generation");
+      assertThat("Should produce the expected splits",
+          sourceContext.getPartitionPaths(), is("par1,par2,par3,par4"));
+      assertTrue(sourceContext.splits.stream().allMatch(split -> 
split.getInstantRange().isPresent()),
+          "All the instants should have range limit");
+
+      // Stop the stream task.
+      function.close();
+    }
+  }
+
+  @Test
+  public void testConsumeFromSpecifiedCommit() throws Exception {
+    // write 2 commits first, use the second commit time as the specified 
start instant,
+    // all the splits should come from the second commit.
+    TestData.writeData(TestData.DATA_SET_ONE, conf);
+    TestData.writeData(TestData.DATA_SET_TWO, conf);
+    String specifiedCommit = 
TestUtils.getLatestCommit(tempFile.getAbsolutePath());
+    conf.setString(FlinkOptions.READ_STREAMING_START_COMMIT, specifiedCommit);
+    StreamReadMonitoringFunction function = TestUtils.getMonitorFunc(conf);
+    try (AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> harness = 
createHarness(function)) {
+      harness.setup();
+      harness.open();
+
+      CountDownLatch latch = new CountDownLatch(4);
+      CollectingSourceContext sourceContext = new 
CollectingSourceContext(latch);
+
+      runAsync(sourceContext, function);
+
+      assertTrue(latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should 
finish splits generation");
+      assertThat("Should produce the expected splits",
+          sourceContext.getPartitionPaths(), is("par1,par2,par3,par4"));
+      assertTrue(sourceContext.splits.stream().allMatch(split -> 
split.getInstantRange().isPresent()),
+          "All the instants should have range limit");
+      assertTrue(sourceContext.splits.stream().allMatch(split -> 
split.getLatestCommit().equals(specifiedCommit)),
+          "All the splits should be with specified instant time");
+
+      // Stop the stream task.
+      function.close();
+    }
+  }
+
+  @Test
+  public void testCheckpointRestore() throws Exception {
+    TestData.writeData(TestData.DATA_SET_ONE, conf);
+
+    StreamReadMonitoringFunction function = TestUtils.getMonitorFunc(conf);
+    OperatorSubtaskState state;
+    try (AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> harness = 
createHarness(function)) {
+      harness.setup();
+      harness.open();
+
+      CountDownLatch latch = new CountDownLatch(4);
+      CollectingSourceContext sourceContext = new 
CollectingSourceContext(latch);
+      runAsync(sourceContext, function);
+
+      assertTrue(latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should 
finish splits generation");
+      Thread.sleep(1000L);
+
+      state = harness.snapshot(1, 1);
+
+      // Stop the stream task.
+      function.close();
+
+      assertTrue(latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should 
finish splits generation");
+      assertThat("Should produce the expected splits",
+          sourceContext.getPartitionPaths(), is("par1,par2,par3,par4"));
+      assertTrue(sourceContext.splits.stream().noneMatch(split -> 
split.getInstantRange().isPresent()),
+          "No instants should have range limit");
+
+    }
+
+    TestData.writeData(TestData.DATA_SET_TWO, conf);
+    StreamReadMonitoringFunction function2 = TestUtils.getMonitorFunc(conf);
+    try (AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> harness = 
createHarness(function2)) {
+      harness.setup();
+      // Recover to process the remaining snapshots.
+      harness.initializeState(state);
+      harness.open();
+
+      CountDownLatch latch = new CountDownLatch(4);
+      CollectingSourceContext sourceContext = new 
CollectingSourceContext(latch);
+      runAsync(sourceContext, function2);
+
+      // Stop the stream task.
+      function.close();
+
+      assertTrue(latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should 
finish splits generation");
+      assertThat("Should produce the expected splits",
+          sourceContext.getPartitionPaths(), is("par1,par2,par3,par4"));
+      assertTrue(sourceContext.splits.stream().allMatch(split -> 
split.getInstantRange().isPresent()),
+          "All the instants should have range limit");
+    }
+  }
+
+  private AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> 
createHarness(
+      StreamReadMonitoringFunction function) throws Exception {
+    StreamSource<MergeOnReadInputSplit, StreamReadMonitoringFunction> 
streamSource = new StreamSource<>(function);
+    return new AbstractStreamOperatorTestHarness<>(streamSource, 1, 1, 0);
+  }
+
+  private void runAsync(
+      CollectingSourceContext sourceContext,
+      StreamReadMonitoringFunction function) {
+    Thread task = new Thread(() -> {
+      try {
+        function.run(sourceContext);
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    });
+    task.start();
+  }
+
+  /**
+   * Source context that collects the outputs in to a list.
+   */
+  private static class CollectingSourceContext implements 
SourceFunction.SourceContext<MergeOnReadInputSplit> {
+    private final List<MergeOnReadInputSplit> splits = new ArrayList<>();
+    private final Object checkpointLock = new Object();
+    private volatile CountDownLatch latch;
+
+    CollectingSourceContext(CountDownLatch latch) {
+      this.latch = latch;
+    }
+
+    @Override
+    public void collect(MergeOnReadInputSplit element) {
+      splits.add(element);
+      latch.countDown();
+    }
+
+    @Override
+    public void collectWithTimestamp(MergeOnReadInputSplit element, long 
timestamp) {
+      collect(element);
+    }
+
+    @Override
+    public void emitWatermark(Watermark mark) {
+
+    }
+
+    @Override
+    public void markAsTemporarilyIdle() {
+
+    }
+
+    @Override
+    public Object getCheckpointLock() {
+      return checkpointLock;
+    }
+
+    @Override
+    public void close() {
+
+    }
+
+    public void reset(CountDownLatch latch) {
+      this.latch = latch;
+      this.splits.clear();
+    }
+
+    public String getPartitionPaths() {
+      return this.splits.stream()
+          .map(TestUtils::getSplitPartitionPath)
+          .sorted(Comparator.naturalOrder())
+          .collect(Collectors.joining(","));
+    }
+  }
+}
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java 
b/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java
new file mode 100644
index 0000000..e13f950
--- /dev/null
+++ 
b/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source;
+
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.operator.FlinkOptions;
+import org.apache.hudi.operator.StreamReadMonitoringFunction;
+import org.apache.hudi.operator.StreamReadOperator;
+import org.apache.hudi.operator.utils.TestConfigurations;
+import org.apache.hudi.operator.utils.TestData;
+import org.apache.hudi.source.format.FilePathUtils;
+import org.apache.hudi.source.format.mor.MergeOnReadInputFormat;
+import org.apache.hudi.source.format.mor.MergeOnReadInputSplit;
+import org.apache.hudi.source.format.mor.MergeOnReadTableState;
+import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.StreamerUtil;
+import org.apache.hudi.utils.TestUtils;
+
+import org.apache.avro.Schema;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor;
+import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
+import 
org.apache.flink.streaming.runtime.tasks.mailbox.SteppingMailboxProcessor;
+import org.apache.flink.streaming.util.CollectingSourceContext;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test cases for {@link StreamReadOperator}.
+ */
+public class TestStreamReadOperator {
+  private static final Map<String, String> EXPECTED = new HashMap<>();
+  static {
+    EXPECTED.put("par1", "id1,Danny,23,1970-01-01T00:00:00.001,par1, 
id2,Stephen,33,1970-01-01T00:00:00.002,par1");
+    EXPECTED.put("par2", "id3,Julian,53,1970-01-01T00:00:00.003,par2, 
id4,Fabian,31,1970-01-01T00:00:00.004,par2");
+    EXPECTED.put("par3", "id5,Sophia,18,1970-01-01T00:00:00.005,par3, 
id6,Emma,20,1970-01-01T00:00:00.006,par3");
+    EXPECTED.put("par4", "id7,Bob,44,1970-01-01T00:00:00.007,par4, 
id8,Han,56,1970-01-01T00:00:00.008,par4");
+  }
+
+  private Configuration conf;
+
+  @TempDir
+  File tempFile;
+
+  @BeforeEach
+  public void before() throws Exception {
+    final String basePath = tempFile.getAbsolutePath();
+    conf = TestConfigurations.getDefaultConf(basePath);
+    conf.setString(FlinkOptions.TABLE_TYPE, 
FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
+
+    StreamerUtil.initTableIfNotExists(conf);
+  }
+
+  @Test
+  void testWriteRecords() throws Exception {
+    TestData.writeData(TestData.DATA_SET_ONE, conf);
+    try (OneInputStreamOperatorTestHarness<MergeOnReadInputSplit, RowData> 
harness = createReader()) {
+      harness.setup();
+      harness.open();
+
+      SteppingMailboxProcessor processor = createLocalMailbox(harness);
+      StreamReadMonitoringFunction func = TestUtils.getMonitorFunc(conf);
+
+      List<MergeOnReadInputSplit> splits = generateSplits(func);
+      assertThat("Should have 4 splits", splits.size(), is(4));
+      for (MergeOnReadInputSplit split : splits) {
+        // Process this element to enqueue to mail-box.
+        harness.processElement(split, -1);
+
+        // Run the mail-box once to read all records from the given split.
+        assertThat("Should process 1 split", processor.runMailboxStep());
+      }
+      // Assert the output has expected elements.
+      TestData.assertRowDataEquals(harness.extractOutputValues(), 
TestData.DATA_SET_ONE);
+
+      TestData.writeData(TestData.DATA_SET_TWO, conf);
+      final List<MergeOnReadInputSplit> splits2 = generateSplits(func);
+      assertThat("Should have 4 splits", splits2.size(), is(4));
+      for (MergeOnReadInputSplit split : splits2) {
+        // Process this element to enqueue to mail-box.
+        harness.processElement(split, -1);
+
+        // Run the mail-box once to read all records from the given split.
+        assertThat("Should processed 1 split", processor.runMailboxStep());
+      }
+      // The result sets behaves like append only: DATA_SET_ONE + DATA_SET_TWO
+      List<RowData> expected = new ArrayList<>(TestData.DATA_SET_ONE);
+      expected.addAll(TestData.DATA_SET_TWO);
+      TestData.assertRowDataEquals(harness.extractOutputValues(), expected);
+    }
+  }
+
+  @Test
+  public void testCheckpoint() throws Exception {
+    // Received emitted splits: split1, split2, split3, split4, checkpoint 
request is triggered
+    // when reading records from split1.
+    TestData.writeData(TestData.DATA_SET_ONE, conf);
+    long timestamp = 0;
+    try (OneInputStreamOperatorTestHarness<MergeOnReadInputSplit, RowData> 
harness = createReader()) {
+      harness.setup();
+      harness.open();
+
+      SteppingMailboxProcessor processor = createLocalMailbox(harness);
+      StreamReadMonitoringFunction func = TestUtils.getMonitorFunc(conf);
+
+      List<MergeOnReadInputSplit> splits = generateSplits(func);
+      assertThat("Should have 4 splits", splits.size(), is(4));
+
+      for (MergeOnReadInputSplit split : splits) {
+        harness.processElement(split, ++timestamp);
+      }
+
+      // Trigger snapshot state, it will start to work once all records from 
split0 are read.
+      processor.getMainMailboxExecutor()
+          .execute(() -> harness.snapshot(1, 3), "Trigger snapshot");
+
+      assertTrue(processor.runMailboxStep(), "Should have processed the 
split0");
+      assertTrue(processor.runMailboxStep(), "Should have processed the 
snapshot state action");
+
+      assertThat(TestData.rowDataToString(harness.extractOutputValues()),
+          is(getSplitExpected(Collections.singletonList(splits.get(0)), 
EXPECTED)));
+
+      // Read records from split1.
+      assertTrue(processor.runMailboxStep(), "Should have processed the 
split1");
+
+      // Read records from split2.
+      assertTrue(processor.runMailboxStep(), "Should have processed the 
split2");
+
+      // Read records from split3.
+      assertTrue(processor.runMailboxStep(), "Should have processed the 
split3");
+
+      // Assert the output has expected elements.
+      TestData.assertRowDataEquals(harness.extractOutputValues(), 
TestData.DATA_SET_ONE);
+    }
+  }
+
+  @Test
+  public void testCheckpointRestore() throws Exception {
+    TestData.writeData(TestData.DATA_SET_ONE, conf);
+
+    OperatorSubtaskState state;
+    final List<MergeOnReadInputSplit> splits;
+    try (OneInputStreamOperatorTestHarness<MergeOnReadInputSplit, RowData> 
harness = createReader()) {
+      harness.setup();
+      harness.open();
+
+      StreamReadMonitoringFunction func = TestUtils.getMonitorFunc(conf);
+
+      splits = generateSplits(func);
+      assertThat("Should have 4 splits", splits.size(), is(4));
+
+      // Enqueue all the splits.
+      for (MergeOnReadInputSplit split : splits) {
+        harness.processElement(split, -1);
+      }
+
+      // Read all records from the first 2 splits.
+      SteppingMailboxProcessor localMailbox = createLocalMailbox(harness);
+      for (int i = 0; i < 2; i++) {
+        assertTrue(localMailbox.runMailboxStep(), "Should have processed the 
split#" + i);
+      }
+
+      assertThat(TestData.rowDataToString(harness.extractOutputValues()),
+          is(getSplitExpected(splits.subList(0, 2), EXPECTED)));
+
+      // Snapshot state now,  there are 2 splits left in the state.
+      state = harness.snapshot(1, 1);
+    }
+
+    try (OneInputStreamOperatorTestHarness<MergeOnReadInputSplit, RowData> 
harness = createReader()) {
+      harness.setup();
+      // Recover to process the remaining splits.
+      harness.initializeState(state);
+      harness.open();
+
+      SteppingMailboxProcessor localMailbox = createLocalMailbox(harness);
+
+      for (int i = 2; i < 4; i++) {
+        assertTrue(localMailbox.runMailboxStep(), "Should have processed one 
split#" + i);
+      }
+
+      // expect to output the left data
+      assertThat(TestData.rowDataToString(harness.extractOutputValues()),
+          is(getSplitExpected(splits.subList(2, 4), EXPECTED)));
+    }
+  }
+
+  private static String getSplitExpected(List<MergeOnReadInputSplit> splits, 
Map<String, String> expected) {
+    return splits.stream()
+        .map(TestUtils::getSplitPartitionPath)
+        .map(expected::get)
+        .sorted(Comparator.naturalOrder())
+        .collect(Collectors.toList()).toString();
+  }
+
+  private List<MergeOnReadInputSplit> 
generateSplits(StreamReadMonitoringFunction func) throws Exception {
+    final List<MergeOnReadInputSplit> splits = new ArrayList<>();
+    func.open(conf);
+    func.monitorDirAndForwardSplits(new CollectingSourceContext<>(new 
Object(), splits));
+    return splits;
+  }
+
+  private OneInputStreamOperatorTestHarness<MergeOnReadInputSplit, RowData> 
createReader() throws Exception {
+    final String basePath = tempFile.getAbsolutePath();
+    final HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
+        .setConf(StreamerUtil.getHadoopConf()).setBasePath(basePath).build();
+    final List<String> partitionKeys = Collections.singletonList("partition");
+
+    // This input format is used to opening the emitted split.
+    TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
+    final Schema tableAvroSchema;
+    try {
+      tableAvroSchema = schemaUtil.getTableAvroSchema();
+    } catch (Exception e) {
+      throw new HoodieException("Get table avro schema error", e);
+    }
+    final DataType rowDataType = 
AvroSchemaConverter.convertToDataType(tableAvroSchema);
+    final RowType rowType = (RowType) rowDataType.getLogicalType();
+    final MergeOnReadTableState hoodieTableState = new MergeOnReadTableState(
+        rowType,
+        TestConfigurations.ROW_TYPE,
+        tableAvroSchema.toString(),
+        
AvroSchemaConverter.convertToSchema(TestConfigurations.ROW_TYPE).toString(),
+        Collections.emptyList());
+    Path[] paths = FilePathUtils.getReadPaths(
+        new Path(basePath), conf, partitionKeys, 
conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME));
+    MergeOnReadInputFormat inputFormat = new MergeOnReadInputFormat(
+        conf,
+        paths,
+        hoodieTableState,
+        rowDataType.getChildren(),
+        "default",
+        1000L);
+
+    OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData> factory = 
StreamReadOperator.factory(inputFormat);
+    OneInputStreamOperatorTestHarness<MergeOnReadInputSplit, RowData> harness 
= new OneInputStreamOperatorTestHarness<>(
+        factory, 1, 1, 0);
+    
harness.getStreamConfig().setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+    return harness;
+  }
+
+  private SteppingMailboxProcessor createLocalMailbox(
+      OneInputStreamOperatorTestHarness<MergeOnReadInputSplit, RowData> 
harness) {
+    return new SteppingMailboxProcessor(
+        MailboxDefaultAction.Controller::suspendDefaultAction,
+        harness.getTaskMailbox(),
+        StreamTaskActionExecutor.IMMEDIATE);
+  }
+}
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/source/format/InputFormatTest.java 
b/hudi-flink/src/test/java/org/apache/hudi/source/format/TestInputFormat.java
similarity index 91%
rename from 
hudi-flink/src/test/java/org/apache/hudi/source/format/InputFormatTest.java
rename to 
hudi-flink/src/test/java/org/apache/hudi/source/format/TestInputFormat.java
index 8bb5299..7774e56 100644
--- 
a/hudi-flink/src/test/java/org/apache/hudi/source/format/InputFormatTest.java
+++ 
b/hudi-flink/src/test/java/org/apache/hudi/source/format/TestInputFormat.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.source.format;
 
+import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.operator.FlinkOptions;
 import org.apache.hudi.operator.utils.TestConfigurations;
 import org.apache.hudi.operator.utils.TestData;
@@ -32,7 +33,7 @@ import org.apache.flink.table.data.RowData;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.params.provider.EnumSource;
 
 import java.io.File;
 import java.io.IOException;
@@ -48,7 +49,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
 /**
  * Test cases for MergeOnReadInputFormat and ParquetInputFormat.
  */
-public class InputFormatTest {
+public class TestInputFormat {
 
   private HoodieTableSource tableSource;
   private Configuration conf;
@@ -56,9 +57,9 @@ public class InputFormatTest {
   @TempDir
   File tempFile;
 
-  void beforeEach(String tableType) throws IOException {
+  void beforeEach(HoodieTableType tableType) throws IOException {
     conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
-    conf.setString(FlinkOptions.TABLE_TYPE, tableType);
+    conf.setString(FlinkOptions.TABLE_TYPE, tableType.name());
     conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false); // close 
the async compaction
 
     StreamerUtil.initTableIfNotExists(conf);
@@ -71,10 +72,8 @@ public class InputFormatTest {
   }
 
   @ParameterizedTest
-  @ValueSource(strings = {
-      FlinkOptions.TABLE_TYPE_COPY_ON_WRITE,
-      FlinkOptions.TABLE_TYPE_MERGE_ON_READ})
-  void testRead(String tableType) throws Exception {
+  @EnumSource(value = HoodieTableType.class)
+  void testRead(HoodieTableType tableType) throws Exception {
     beforeEach(tableType);
 
     TestData.writeData(TestData.DATA_SET_ONE, conf);
@@ -113,7 +112,7 @@ public class InputFormatTest {
 
   @Test
   void testReadBaseAndLogFiles() throws Exception {
-    beforeEach(FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
+    beforeEach(HoodieTableType.MERGE_ON_READ);
 
     // write parquet first with compaction
     conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, true);
@@ -153,10 +152,8 @@ public class InputFormatTest {
   }
 
   @ParameterizedTest
-  @ValueSource(strings = {
-      FlinkOptions.TABLE_TYPE_COPY_ON_WRITE,
-      FlinkOptions.TABLE_TYPE_MERGE_ON_READ})
-  void testReadWithPartitionPrune(String tableType) throws Exception {
+  @EnumSource(value = HoodieTableType.class)
+  void testReadWithPartitionPrune(HoodieTableType tableType) throws Exception {
     beforeEach(tableType);
 
     TestData.writeData(TestData.DATA_SET_ONE, conf);
diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java 
b/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java
new file mode 100644
index 0000000..fa021f3
--- /dev/null
+++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utils;
+
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.operator.FlinkOptions;
+import org.apache.hudi.operator.StreamReadMonitoringFunction;
+import org.apache.hudi.source.format.mor.MergeOnReadInputSplit;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+
+import java.io.File;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Common test utils.
+ */
+public class TestUtils {
+
+  public static String getLatestCommit(String basePath) {
+    final HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
+        .setConf(StreamerUtil.getHadoopConf()).setBasePath(basePath).build();
+    return 
metaClient.getCommitsAndCompactionTimeline().filterCompletedInstants().lastInstant().get().getTimestamp();
+  }
+
+  public static String getFirstCommit(String basePath) {
+    final HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
+        .setConf(StreamerUtil.getHadoopConf()).setBasePath(basePath).build();
+    return 
metaClient.getCommitsAndCompactionTimeline().filterCompletedInstants().firstInstant().get().getTimestamp();
+  }
+
+  public static String getSplitPartitionPath(MergeOnReadInputSplit split) {
+    assertTrue(split.getLogPaths().isPresent());
+    final String logPath = split.getLogPaths().get().get(0);
+    String[] paths = logPath.split(File.separator);
+    return paths[paths.length - 2];
+  }
+
+  public static StreamReadMonitoringFunction getMonitorFunc(Configuration 
conf) {
+    final String basePath = conf.getString(FlinkOptions.PATH);
+    final HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
+        .setConf(StreamerUtil.getHadoopConf()).setBasePath(basePath).build();
+    return new StreamReadMonitoringFunction(conf, new Path(basePath), 
metaClient, 1024 * 1024L);
+  }
+}
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/utils/factory/CollectSinkTableFactory.java
 
b/hudi-flink/src/test/java/org/apache/hudi/utils/factory/CollectSinkTableFactory.java
new file mode 100644
index 0000000..902987e
--- /dev/null
+++ 
b/hudi-flink/src/test/java/org/apache/hudi/utils/factory/CollectSinkTableFactory.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utils.factory;
+
+import org.apache.hudi.operator.utils.TestConfigurations;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Factory for CollectTableSink.
+ *
+ * <p>Note: The CollectTableSink collects all the data of a table into a 
global collection {@code RESULT},
+ * so the tests should executed in single thread and the table name should be 
the same.
+ */
+public class CollectSinkTableFactory implements DynamicTableSinkFactory {
+  public static final String FACTORY_ID = "collect";
+
+  // global results to collect and query
+  public static final Map<Integer, List<Row>> RESULT = new HashMap<>();
+
+  @Override
+  public DynamicTableSink createDynamicTableSink(Context context) {
+    FactoryUtil.TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
+    helper.validate();
+
+    TableSchema schema = context.getCatalogTable().getSchema();
+    RESULT.clear();
+    return new CollectTableSink(schema, 
context.getObjectIdentifier().getObjectName());
+  }
+
+  @Override
+  public String factoryIdentifier() {
+    return FACTORY_ID;
+  }
+
+  @Override
+  public Set<ConfigOption<?>> requiredOptions() {
+    return Collections.emptySet();
+  }
+
+  @Override
+  public Set<ConfigOption<?>> optionalOptions() {
+    return Collections.emptySet();
+  }
+
+  // 
--------------------------------------------------------------------------------------------
+  // Table sinks
+  // 
--------------------------------------------------------------------------------------------
+
+  /**
+   * Values {@link DynamicTableSink} for testing.
+   */
+  private static class CollectTableSink implements DynamicTableSink {
+
+    private final TableSchema schema;
+    private final String tableName;
+
+    private CollectTableSink(
+        TableSchema schema,
+        String tableName) {
+      this.schema = schema;
+      this.tableName = tableName;
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+      return ChangelogMode.insertOnly();
+    }
+
+    @Override
+    public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+      DataStructureConverter converter = 
context.createDataStructureConverter(schema.toPhysicalRowDataType());
+      return SinkFunctionProvider.of(new CollectSinkFunction(converter));
+    }
+
+    @Override
+    public DynamicTableSink copy() {
+      return new CollectTableSink(schema, tableName);
+    }
+
+    @Override
+    public String asSummaryString() {
+      return "CollectSink";
+    }
+  }
+
+  static class CollectSinkFunction extends RichSinkFunction<RowData> 
implements CheckpointedFunction {
+
+    private static final long serialVersionUID = 1L;
+    private final DynamicTableSink.DataStructureConverter converter;
+
+    protected transient ListState<Row> resultState;
+    protected transient List<Row> localResult;
+
+    private int taskID;
+
+    protected CollectSinkFunction(DynamicTableSink.DataStructureConverter 
converter) {
+      this.converter = converter;
+    }
+
+    @Override
+    public void invoke(RowData value, SinkFunction.Context context) {
+      if (value.getRowKind() == RowKind.INSERT) {
+        Row row = (Row) converter.toExternal(value);
+        assert row != null;
+        RESULT.get(taskID).add(row);
+      } else {
+        throw new RuntimeException(
+            "CollectSinkFunction received " + value.getRowKind() + " 
messages.");
+      }
+    }
+
+    @Override
+    public void initializeState(FunctionInitializationContext context) throws 
Exception {
+      this.resultState = context.getOperatorStateStore().getListState(
+          new ListStateDescriptor<>("sink-results", 
TestConfigurations.ROW_TYPE_INFO));
+      this.localResult = new ArrayList<>();
+      if (context.isRestored()) {
+        for (Row value : resultState.get()) {
+          localResult.add(value);
+        }
+      }
+      this.taskID = getRuntimeContext().getIndexOfThisSubtask();
+      synchronized (CollectSinkTableFactory.class) {
+        RESULT.put(taskID, localResult);
+      }
+    }
+
+    @Override
+    public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
+      resultState.clear();
+      resultState.addAll(RESULT.get(taskID));
+    }
+  }
+}
diff --git 
a/hudi-flink/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
 
b/hudi-flink/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 0000000..5fec9b6
--- /dev/null
+++ 
b/hudi-flink/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.hudi.utils.factory.CollectSinkTableFactory
diff --git a/hudi-flink/src/test/resources/test_source2.data 
b/hudi-flink/src/test/resources/test_source2.data
new file mode 100644
index 0000000..ff8265d
--- /dev/null
+++ b/hudi-flink/src/test/resources/test_source2.data
@@ -0,0 +1,8 @@
+{"uuid": "id1", "name": "Danny", "age": 24, "ts": "1970-01-01T00:00:01", 
"partition": "par1"}
+{"uuid": "id2", "name": "Stephen", "age": 34, "ts": "1970-01-01T00:00:02", 
"partition": "par1"}
+{"uuid": "id3", "name": "Julian", "age": 54, "ts": "1970-01-01T00:00:03", 
"partition": "par2"}
+{"uuid": "id4", "name": "Fabian", "age": 32, "ts": "1970-01-01T00:00:04", 
"partition": "par2"}
+{"uuid": "id5", "name": "Sophia", "age": 18, "ts": "1970-01-01T00:00:05", 
"partition": "par3"}
+{"uuid": "id9", "name": "Jane", "age": 19, "ts": "1970-01-01T00:00:06", 
"partition": "par3"}
+{"uuid": "id10", "name": "Ella", "age": 38, "ts": "1970-01-01T00:00:07", 
"partition": "par4"}
+{"uuid": "id11", "name": "Phoebe", "age": 52, "ts": "1970-01-01T00:00:08", 
"partition": "par4"}

Reply via email to