yanghua commented on a change in pull request #2640:
URL: https://github.com/apache/hudi/pull/2640#discussion_r589357979



##########
File path: hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utils;
+
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.source.format.mor.MergeOnReadInputSplit;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.flink.core.fs.Path;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Common test utils.
+ */
+public class TestUtils {
+
+  public static String getLatestCommit(String basePath) {
+    final HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
+        .setConf(StreamerUtil.getHadoopConf()).setBasePath(basePath).build();
+    return 
metaClient.getCommitsAndCompactionTimeline().filterCompletedInstants().lastInstant().get().getTimestamp();
+  }
+
+  public static String getFirstCommit(String basePath) {
+    final HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
+        .setConf(StreamerUtil.getHadoopConf()).setBasePath(basePath).build();
+    return 
metaClient.getCommitsAndCompactionTimeline().filterCompletedInstants().firstInstant().get().getTimestamp();
+  }
+
+  public static String getSplitPartitionPath(MergeOnReadInputSplit split) {
+    assertTrue(split.getLogPaths().isPresent());
+    final String logPath = split.getLogPaths().get().get(0);
+    String[] paths = logPath.split(Path.SEPARATOR);

Review comment:
       If we change it to be `File.separator`. We can make this class 
flink-free, right? So we can move it to be a general util class, not only for 
Flink.

##########
File path: 
hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source;
+
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.operator.FlinkOptions;
+import org.apache.hudi.operator.StreamReadMonitoringFunction;
+import org.apache.hudi.operator.utils.TestConfigurations;
+import org.apache.hudi.operator.utils.TestData;
+import org.apache.hudi.source.format.mor.MergeOnReadInputSplit;
+import org.apache.hudi.util.StreamerUtil;
+import org.apache.hudi.utils.TestUtils;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.StreamSource;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test cases for {@link StreamReadMonitoringFunction}.
+ */
+public class TestStreamReadMonitoringFunction {
+  private static final long WAIT_TIME_MILLIS = 5 * 1000L;
+
+  private Configuration conf;
+
+  @TempDir
+  File tempFile;
+
+  @BeforeEach
+  public void before() throws Exception {
+    final String basePath = tempFile.getAbsolutePath();
+    conf = TestConfigurations.getDefaultConf(basePath);
+    conf.setString(FlinkOptions.TABLE_TYPE, 
FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
+    conf.setInteger(FlinkOptions.STREAMING_CHECK_INTERVAL, 2); // check every 
2 seconds
+
+    StreamerUtil.initTableIfNotExists(conf);
+  }
+
+  @Test
+  public void testConsumeFromLatestCommit() throws Exception {
+    TestData.writeData(TestData.DATA_SET_ONE, conf);
+    StreamReadMonitoringFunction function = getMonitorFunc();
+    try (AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> harness = 
createHarness(function)) {
+      harness.setup();
+      harness.open();
+
+      CountDownLatch latch = new CountDownLatch(4);
+      TestSourceContext sourceContext = new TestSourceContext(latch);
+
+      runAsync(sourceContext, function);
+
+      assertTrue(latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should 
finish splits generation");
+      assertThat("Should produce the expected splits",
+          sourceContext.getPartitionPaths(), is("par1,par2,par3,par4"));
+      assertTrue(sourceContext.splits.stream().noneMatch(split -> 
split.getInstantRange().isPresent()),
+          "No instants should have range limit");
+
+      Thread.sleep(1000L);
+
+      // reset the source context
+      latch = new CountDownLatch(4);
+      sourceContext.reset(latch);
+
+      // write another instant and validate
+      TestData.writeData(TestData.DATA_SET_TWO, conf);
+
+      assertTrue(latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should 
finish splits generation");
+      assertThat("Should produce the expected splits",
+          sourceContext.getPartitionPaths(), is("par1,par2,par3,par4"));
+      assertTrue(sourceContext.splits.stream().allMatch(split -> 
split.getInstantRange().isPresent()),
+          "All the instants should have range limit");
+
+      // Stop the stream task.
+      function.close();
+    }
+  }
+
+  @Test
+  public void testConsumeFromSpecifiedCommit() throws Exception {
+    // write 2 commits first, use the second commit time as the specified 
start instant,
+    // all the splits should come from the second commit.
+    TestData.writeData(TestData.DATA_SET_ONE, conf);
+    TestData.writeData(TestData.DATA_SET_TWO, conf);
+    String specifiedCommit = 
TestUtils.getLatestCommit(tempFile.getAbsolutePath());
+    conf.setString(FlinkOptions.STREAMING_START_COMMIT, specifiedCommit);
+    StreamReadMonitoringFunction function = getMonitorFunc();
+    try (AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> harness = 
createHarness(function)) {
+      harness.setup();
+      harness.open();
+
+      CountDownLatch latch = new CountDownLatch(4);
+      TestSourceContext sourceContext = new TestSourceContext(latch);
+
+      runAsync(sourceContext, function);
+
+      assertTrue(latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should 
finish splits generation");
+      assertThat("Should produce the expected splits",
+          sourceContext.getPartitionPaths(), is("par1,par2,par3,par4"));
+      assertTrue(sourceContext.splits.stream().allMatch(split -> 
split.getInstantRange().isPresent()),
+          "All the instants should have range limit");
+      assertTrue(sourceContext.splits.stream().allMatch(split -> 
split.getLatestCommit().equals(specifiedCommit)),
+          "All the splits should be with specified instant time");
+
+      // Stop the stream task.
+      function.close();
+    }
+  }
+
+  @Test
+  public void testCheckpointRestore() throws Exception {
+    TestData.writeData(TestData.DATA_SET_ONE, conf);
+
+    StreamReadMonitoringFunction function = getMonitorFunc();
+    OperatorSubtaskState state;
+    try (AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> harness = 
createHarness(function)) {
+      harness.setup();
+      harness.open();
+
+      CountDownLatch latch = new CountDownLatch(4);
+      TestSourceContext sourceContext = new TestSourceContext(latch);
+      runAsync(sourceContext, function);
+
+      assertTrue(latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should 
finish splits generation");
+      Thread.sleep(1000L);
+
+      state = harness.snapshot(1, 1);
+
+      // Stop the stream task.
+      function.close();
+
+      assertTrue(latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should 
finish splits generation");
+      assertThat("Should produce the expected splits",
+          sourceContext.getPartitionPaths(), is("par1,par2,par3,par4"));
+      assertTrue(sourceContext.splits.stream().noneMatch(split -> 
split.getInstantRange().isPresent()),
+          "No instants should have range limit");
+
+    }
+
+    TestData.writeData(TestData.DATA_SET_TWO, conf);
+    StreamReadMonitoringFunction function2 = getMonitorFunc();
+    try (AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> harness = 
createHarness(function2)) {
+      harness.setup();
+      // Recover to process the remaining snapshots.
+      harness.initializeState(state);
+      harness.open();
+
+      CountDownLatch latch = new CountDownLatch(4);
+      TestSourceContext sourceContext = new TestSourceContext(latch);
+      runAsync(sourceContext, function2);
+
+      // Stop the stream task.
+      function.close();
+
+      assertTrue(latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should 
finish splits generation");
+      assertThat("Should produce the expected splits",
+          sourceContext.getPartitionPaths(), is("par1,par2,par3,par4"));
+      assertTrue(sourceContext.splits.stream().allMatch(split -> 
split.getInstantRange().isPresent()),
+          "All the instants should have range limit");
+    }
+  }
+
+  private StreamReadMonitoringFunction getMonitorFunc() {
+    final String basePath = tempFile.getAbsolutePath();
+    final HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
+        .setConf(StreamerUtil.getHadoopConf()).setBasePath(basePath).build();
+    final List<String> partitionKeys = Collections.singletonList("partition");
+    return new StreamReadMonitoringFunction(conf, new Path(basePath), 
metaClient,
+        partitionKeys, 1024 * 1024L);
+  }
+
+  private AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> 
createHarness(
+      StreamReadMonitoringFunction function) throws Exception {
+    StreamSource<MergeOnReadInputSplit, StreamReadMonitoringFunction> 
streamSource = new StreamSource<>(function);
+    return new AbstractStreamOperatorTestHarness<>(streamSource, 1, 1, 0);
+  }
+
+  private void runAsync(
+      TestSourceContext sourceContext,
+      StreamReadMonitoringFunction function) {
+    Thread task = new Thread(() -> {
+      try {
+        function.run(sourceContext);
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    });
+    task.start();
+  }
+
+  private static class TestSourceContext implements 
SourceFunction.SourceContext<MergeOnReadInputSplit> {

Review comment:
       Considering it's not a test class, it would be better to rename it to be 
`SourceTestContext`.

##########
File path: 
hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java
##########
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source;
+
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.operator.FlinkOptions;
+import org.apache.hudi.operator.StreamReadMonitoringFunction;
+import org.apache.hudi.operator.StreamReadOperator;
+import org.apache.hudi.operator.utils.TestConfigurations;
+import org.apache.hudi.operator.utils.TestData;
+import org.apache.hudi.source.format.FilePathUtils;
+import org.apache.hudi.source.format.mor.MergeOnReadInputFormat;
+import org.apache.hudi.source.format.mor.MergeOnReadInputSplit;
+import org.apache.hudi.source.format.mor.MergeOnReadTableState;
+import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.StreamerUtil;
+import org.apache.hudi.utils.TestUtils;
+
+import org.apache.avro.Schema;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor;
+import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
+import 
org.apache.flink.streaming.runtime.tasks.mailbox.SteppingMailboxProcessor;
+import org.apache.flink.streaming.util.CollectingSourceContext;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test cases for {@link StreamReadOperator}.
+ */
+public class TestStreamReadOperator {
+  private Configuration conf;
+
+  @TempDir
+  File tempFile;
+
+  @BeforeEach
+  public void before() throws Exception {
+    final String basePath = tempFile.getAbsolutePath();
+    conf = TestConfigurations.getDefaultConf(basePath);
+    conf.setString(FlinkOptions.TABLE_TYPE, 
FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
+
+    StreamerUtil.initTableIfNotExists(conf);
+  }
+
+  @Test
+  void testWriteRecords() throws Exception {
+    TestData.writeData(TestData.DATA_SET_ONE, conf);
+    try (OneInputStreamOperatorTestHarness<MergeOnReadInputSplit, RowData> 
harness = createReader()) {
+      harness.setup();
+      harness.open();
+
+      SteppingMailboxProcessor processor = createLocalMailbox(harness);
+      StreamReadMonitoringFunction func = getMonitorFunc();
+
+      List<MergeOnReadInputSplit> splits = generateSplits(func);
+      assertThat("Should have 4 splits", splits.size(), is(4));
+      for (MergeOnReadInputSplit split : splits) {
+        // Process this element to enqueue to mail-box.
+        harness.processElement(split, -1);
+
+        // Run the mail-box once to read all records from the given split.
+        assertThat("Should process 1 split", processor.runMailboxStep());
+      }
+      // Assert the output has expected elements.
+      TestData.assertRowDataEquals(harness.extractOutputValues(), 
TestData.DATA_SET_ONE);
+
+      TestData.writeData(TestData.DATA_SET_TWO, conf);
+      final List<MergeOnReadInputSplit> splits2 = generateSplits(func);
+      assertThat("Should have 4 splits", splits2.size(), is(4));
+      for (MergeOnReadInputSplit split : splits2) {
+        // Process this element to enqueue to mail-box.
+        harness.processElement(split, -1);
+
+        // Run the mail-box once to read all records from the given split.
+        assertThat("Should processed 1 split", processor.runMailboxStep());
+      }
+      // The result sets behaves like append only: DATA_SET_ONE + DATA_SET_TWO
+      List<RowData> expected = new ArrayList<>(TestData.DATA_SET_ONE);
+      expected.addAll(TestData.DATA_SET_TWO);
+      TestData.assertRowDataEquals(harness.extractOutputValues(), expected);
+    }
+  }
+
+  @Test
+  public void testCheckpoint() throws Exception {
+    // Received emitted splits: split1, split2, split3, split4, checkpoint 
request is triggered
+    // when reading records from split1.
+    TestData.writeData(TestData.DATA_SET_ONE, conf);
+    long timestamp = 0;
+    try (OneInputStreamOperatorTestHarness<MergeOnReadInputSplit, RowData> 
harness = createReader()) {
+      harness.setup();
+      harness.open();
+
+      SteppingMailboxProcessor processor = createLocalMailbox(harness);
+      StreamReadMonitoringFunction func = getMonitorFunc();
+
+      List<MergeOnReadInputSplit> splits = generateSplits(func);
+      assertThat("Should have 4 splits", splits.size(), is(4));
+
+      harness.processElement(splits.get(0), ++timestamp);
+      harness.processElement(splits.get(1), ++timestamp);
+      harness.processElement(splits.get(2), ++timestamp);
+      harness.processElement(splits.get(3), ++timestamp);

Review comment:
       IMO, a for-loop can implement it.

##########
File path: hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
##########
@@ -332,7 +331,7 @@ public static String getTablePath(FileSystem fs, Path[] 
userProvidedPaths) throw
   public static boolean needsScheduleCompaction(Configuration conf) {
     return conf.getString(FlinkOptions.TABLE_TYPE)
         .toUpperCase(Locale.ROOT)
-        .equals(HoodieTableType.MERGE_ON_READ.name())
+        .equals(FlinkOptions.TABLE_TYPE_MERGE_ON_READ)

Review comment:
       I still think the old one is better. We should have the only fact 
source. If we define a new one, it would bring more cost of maintenance.

##########
File path: 
hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java
##########
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source;
+
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.operator.FlinkOptions;
+import org.apache.hudi.operator.StreamReadMonitoringFunction;
+import org.apache.hudi.operator.StreamReadOperator;
+import org.apache.hudi.operator.utils.TestConfigurations;
+import org.apache.hudi.operator.utils.TestData;
+import org.apache.hudi.source.format.FilePathUtils;
+import org.apache.hudi.source.format.mor.MergeOnReadInputFormat;
+import org.apache.hudi.source.format.mor.MergeOnReadInputSplit;
+import org.apache.hudi.source.format.mor.MergeOnReadTableState;
+import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.StreamerUtil;
+import org.apache.hudi.utils.TestUtils;
+
+import org.apache.avro.Schema;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor;
+import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
+import 
org.apache.flink.streaming.runtime.tasks.mailbox.SteppingMailboxProcessor;
+import org.apache.flink.streaming.util.CollectingSourceContext;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test cases for {@link StreamReadOperator}.
+ */
+public class TestStreamReadOperator {
+  private Configuration conf;
+
+  @TempDir
+  File tempFile;
+
+  @BeforeEach
+  public void before() throws Exception {
+    final String basePath = tempFile.getAbsolutePath();
+    conf = TestConfigurations.getDefaultConf(basePath);
+    conf.setString(FlinkOptions.TABLE_TYPE, 
FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
+
+    StreamerUtil.initTableIfNotExists(conf);
+  }
+
+  @Test
+  void testWriteRecords() throws Exception {
+    TestData.writeData(TestData.DATA_SET_ONE, conf);
+    try (OneInputStreamOperatorTestHarness<MergeOnReadInputSplit, RowData> 
harness = createReader()) {
+      harness.setup();
+      harness.open();
+
+      SteppingMailboxProcessor processor = createLocalMailbox(harness);
+      StreamReadMonitoringFunction func = getMonitorFunc();
+
+      List<MergeOnReadInputSplit> splits = generateSplits(func);
+      assertThat("Should have 4 splits", splits.size(), is(4));
+      for (MergeOnReadInputSplit split : splits) {
+        // Process this element to enqueue to mail-box.
+        harness.processElement(split, -1);
+
+        // Run the mail-box once to read all records from the given split.
+        assertThat("Should process 1 split", processor.runMailboxStep());
+      }
+      // Assert the output has expected elements.
+      TestData.assertRowDataEquals(harness.extractOutputValues(), 
TestData.DATA_SET_ONE);
+
+      TestData.writeData(TestData.DATA_SET_TWO, conf);
+      final List<MergeOnReadInputSplit> splits2 = generateSplits(func);
+      assertThat("Should have 4 splits", splits2.size(), is(4));
+      for (MergeOnReadInputSplit split : splits2) {
+        // Process this element to enqueue to mail-box.
+        harness.processElement(split, -1);
+
+        // Run the mail-box once to read all records from the given split.
+        assertThat("Should processed 1 split", processor.runMailboxStep());
+      }
+      // The result sets behaves like append only: DATA_SET_ONE + DATA_SET_TWO
+      List<RowData> expected = new ArrayList<>(TestData.DATA_SET_ONE);
+      expected.addAll(TestData.DATA_SET_TWO);
+      TestData.assertRowDataEquals(harness.extractOutputValues(), expected);
+    }
+  }
+
+  @Test
+  public void testCheckpoint() throws Exception {
+    // Received emitted splits: split1, split2, split3, split4, checkpoint 
request is triggered
+    // when reading records from split1.
+    TestData.writeData(TestData.DATA_SET_ONE, conf);
+    long timestamp = 0;
+    try (OneInputStreamOperatorTestHarness<MergeOnReadInputSplit, RowData> 
harness = createReader()) {
+      harness.setup();
+      harness.open();
+
+      SteppingMailboxProcessor processor = createLocalMailbox(harness);
+      StreamReadMonitoringFunction func = getMonitorFunc();
+
+      List<MergeOnReadInputSplit> splits = generateSplits(func);
+      assertThat("Should have 4 splits", splits.size(), is(4));
+
+      harness.processElement(splits.get(0), ++timestamp);
+      harness.processElement(splits.get(1), ++timestamp);
+      harness.processElement(splits.get(2), ++timestamp);
+      harness.processElement(splits.get(3), ++timestamp);
+
+      // Trigger snapshot state, it will start to work once all records from 
split0 are read.
+      processor.getMainMailboxExecutor()
+          .execute(() -> harness.snapshot(1, 3), "Trigger snapshot");
+
+      assertTrue(processor.runMailboxStep(), "Should have processed the 
split0");
+      assertTrue(processor.runMailboxStep(), "Should have processed the 
snapshot state action");
+
+      assertThat(TestData.rowDataToString(harness.extractOutputValues()),
+          is("[id3,Julian,53,1970-01-01T00:00:00.003,par2, 
id4,Fabian,31,1970-01-01T00:00:00.004,par2]"));

Review comment:
       We should avoid using these hard codes.

##########
File path: 
hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java
##########
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source;
+
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.operator.FlinkOptions;
+import org.apache.hudi.operator.StreamReadMonitoringFunction;
+import org.apache.hudi.operator.StreamReadOperator;
+import org.apache.hudi.operator.utils.TestConfigurations;
+import org.apache.hudi.operator.utils.TestData;
+import org.apache.hudi.source.format.FilePathUtils;
+import org.apache.hudi.source.format.mor.MergeOnReadInputFormat;
+import org.apache.hudi.source.format.mor.MergeOnReadInputSplit;
+import org.apache.hudi.source.format.mor.MergeOnReadTableState;
+import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.StreamerUtil;
+import org.apache.hudi.utils.TestUtils;
+
+import org.apache.avro.Schema;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor;
+import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
+import 
org.apache.flink.streaming.runtime.tasks.mailbox.SteppingMailboxProcessor;
+import org.apache.flink.streaming.util.CollectingSourceContext;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test cases for {@link StreamReadOperator}.
+ */
+public class TestStreamReadOperator {
+  private Configuration conf;
+
+  @TempDir
+  File tempFile;
+
+  @BeforeEach
+  public void before() throws Exception {
+    final String basePath = tempFile.getAbsolutePath();
+    conf = TestConfigurations.getDefaultConf(basePath);
+    conf.setString(FlinkOptions.TABLE_TYPE, 
FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
+
+    StreamerUtil.initTableIfNotExists(conf);
+  }
+
+  @Test
+  void testWriteRecords() throws Exception {
+    TestData.writeData(TestData.DATA_SET_ONE, conf);
+    try (OneInputStreamOperatorTestHarness<MergeOnReadInputSplit, RowData> 
harness = createReader()) {
+      harness.setup();
+      harness.open();
+
+      SteppingMailboxProcessor processor = createLocalMailbox(harness);
+      StreamReadMonitoringFunction func = getMonitorFunc();
+
+      List<MergeOnReadInputSplit> splits = generateSplits(func);
+      assertThat("Should have 4 splits", splits.size(), is(4));
+      for (MergeOnReadInputSplit split : splits) {
+        // Process this element to enqueue to mail-box.
+        harness.processElement(split, -1);
+
+        // Run the mail-box once to read all records from the given split.
+        assertThat("Should process 1 split", processor.runMailboxStep());
+      }
+      // Assert the output has expected elements.
+      TestData.assertRowDataEquals(harness.extractOutputValues(), 
TestData.DATA_SET_ONE);
+
+      TestData.writeData(TestData.DATA_SET_TWO, conf);
+      final List<MergeOnReadInputSplit> splits2 = generateSplits(func);
+      assertThat("Should have 4 splits", splits2.size(), is(4));
+      for (MergeOnReadInputSplit split : splits2) {
+        // Process this element to enqueue to mail-box.
+        harness.processElement(split, -1);
+
+        // Run the mail-box once to read all records from the given split.
+        assertThat("Should processed 1 split", processor.runMailboxStep());
+      }
+      // The result sets behaves like append only: DATA_SET_ONE + DATA_SET_TWO
+      List<RowData> expected = new ArrayList<>(TestData.DATA_SET_ONE);
+      expected.addAll(TestData.DATA_SET_TWO);
+      TestData.assertRowDataEquals(harness.extractOutputValues(), expected);
+    }
+  }
+
+  @Test
+  public void testCheckpoint() throws Exception {
+    // Received emitted splits: split1, split2, split3, split4, checkpoint 
request is triggered
+    // when reading records from split1.
+    TestData.writeData(TestData.DATA_SET_ONE, conf);
+    long timestamp = 0;
+    try (OneInputStreamOperatorTestHarness<MergeOnReadInputSplit, RowData> 
harness = createReader()) {
+      harness.setup();
+      harness.open();
+
+      SteppingMailboxProcessor processor = createLocalMailbox(harness);
+      StreamReadMonitoringFunction func = getMonitorFunc();
+
+      List<MergeOnReadInputSplit> splits = generateSplits(func);
+      assertThat("Should have 4 splits", splits.size(), is(4));
+
+      harness.processElement(splits.get(0), ++timestamp);
+      harness.processElement(splits.get(1), ++timestamp);
+      harness.processElement(splits.get(2), ++timestamp);
+      harness.processElement(splits.get(3), ++timestamp);
+
+      // Trigger snapshot state, it will start to work once all records from 
split0 are read.
+      processor.getMainMailboxExecutor()
+          .execute(() -> harness.snapshot(1, 3), "Trigger snapshot");
+
+      assertTrue(processor.runMailboxStep(), "Should have processed the 
split0");
+      assertTrue(processor.runMailboxStep(), "Should have processed the 
snapshot state action");
+
+      assertThat(TestData.rowDataToString(harness.extractOutputValues()),
+          is("[id3,Julian,53,1970-01-01T00:00:00.003,par2, 
id4,Fabian,31,1970-01-01T00:00:00.004,par2]"));
+
+      // Read records from split1.
+      assertTrue(processor.runMailboxStep(), "Should have processed the 
split1");
+
+      // Read records from split2.
+      assertTrue(processor.runMailboxStep(), "Should have processed the 
split2");
+
+      // Read records from split3.
+      assertTrue(processor.runMailboxStep(), "Should have processed the 
split3");
+
+      // Assert the output has expected elements.
+      TestData.assertRowDataEquals(harness.extractOutputValues(), 
TestData.DATA_SET_ONE);
+    }
+  }
+
+  @Test
+  public void testCheckpointRestore() throws Exception {
+    TestData.writeData(TestData.DATA_SET_ONE, conf);
+
+    final Map<String, String> expected = new HashMap<>();
+    expected.put("par1", "id1,Danny,23,1970-01-01T00:00:00.001,par1, 
id2,Stephen,33,1970-01-01T00:00:00.002,par1");
+    expected.put("par2", "id3,Julian,53,1970-01-01T00:00:00.003,par2, 
id4,Fabian,31,1970-01-01T00:00:00.004,par2");
+    expected.put("par3", "id5,Sophia,18,1970-01-01T00:00:00.005,par3, 
id6,Emma,20,1970-01-01T00:00:00.006,par3");
+    expected.put("par4", "id7,Bob,44,1970-01-01T00:00:00.007,par4, 
id8,Han,56,1970-01-01T00:00:00.008,par4");
+
+    OperatorSubtaskState state;
+    final List<MergeOnReadInputSplit> splits;
+    try (OneInputStreamOperatorTestHarness<MergeOnReadInputSplit, RowData> 
harness = createReader()) {
+      harness.setup();
+      harness.open();
+
+      StreamReadMonitoringFunction func = getMonitorFunc();
+
+      splits = generateSplits(func);
+      assertThat("Should have 4 splits", splits.size(), is(4));
+
+      // Enqueue all the splits.
+      for (MergeOnReadInputSplit split : splits) {
+        harness.processElement(split, -1);
+      }
+
+      // Read all records from the first 2 splits.
+      SteppingMailboxProcessor localMailbox = createLocalMailbox(harness);
+      for (int i = 0; i < 2; i++) {
+        assertTrue(localMailbox.runMailboxStep(), "Should have processed the 
split#" + i);
+      }
+
+      assertThat(TestData.rowDataToString(harness.extractOutputValues()),
+          is(getSplitExpected(splits.subList(0, 2), expected)));
+
+      // Snapshot state now,  there are 2 splits left in the state.
+      state = harness.snapshot(1, 1);
+    }
+
+    try (OneInputStreamOperatorTestHarness<MergeOnReadInputSplit, RowData> 
harness = createReader()) {
+      harness.setup();
+      // Recover to process the remaining splits.
+      harness.initializeState(state);
+      harness.open();
+
+      SteppingMailboxProcessor localMailbox = createLocalMailbox(harness);
+
+      for (int i = 2; i < 4; i++) {
+        assertTrue(localMailbox.runMailboxStep(), "Should have processed one 
split#" + i);
+      }
+
+      // expect to output the left data
+      assertThat(TestData.rowDataToString(harness.extractOutputValues()),
+          is(getSplitExpected(splits.subList(2, 4), expected)));
+    }
+  }
+
+  private static String getSplitExpected(List<MergeOnReadInputSplit> splits, 
Map<String, String> expected) {
+    return splits.stream()
+        .map(TestUtils::getSplitPartitionPath)
+        .map(expected::get)
+        .sorted(Comparator.naturalOrder())
+        .collect(Collectors.toList()).toString();
+  }
+
+  private StreamReadMonitoringFunction getMonitorFunc() {

Review comment:
       `getMonitorFuncObj` or `getMonitorFuncInstance` sounds better?

##########
File path: 
hudi-flink/src/main/java/org/apache/hudi/source/format/mor/InstantRange.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.format.mor;
+
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * A instant commits range used for incremental reader filtering.
+ */
+public abstract class InstantRange implements Serializable {
+  private static final long serialVersionUID = 1L;
+
+  protected final String startInstant;
+  protected final String endInstant;
+
+  public InstantRange(String startInstant, String endInstant) {
+    this.startInstant = Objects.requireNonNull(startInstant);
+    this.endInstant = Objects.requireNonNull(endInstant);
+  }
+
+  public static InstantRange getInstance(String startInstant, String 
endInstant, RangeType rangeType) {
+    switch (rangeType) {
+      case OPEN_CLOSE:
+        return new OpenCloseRange(startInstant, endInstant);
+      case CLOSE_CLOSE:
+        return new CloseCloseRange(startInstant, endInstant);
+      default:
+        throw new AssertionError();
+    }
+  }
+
+  public String getStartInstant() {
+    return startInstant;
+  }
+
+  public String getEndInstant() {
+    return endInstant;
+  }
+
+  public abstract boolean isInRange(String instant);
+
+  // -------------------------------------------------------------------------
+  //  Inner Class
+  // -------------------------------------------------------------------------
+
+  /**
+   * Represents a range type.
+   */
+  public enum RangeType {
+    OPEN_CLOSE, CLOSE_CLOSE;
+  }
+
+  private static class OpenCloseRange extends InstantRange {
+
+    public OpenCloseRange(String startInstant, String endInstant) {
+      super(startInstant, endInstant);
+    }
+
+    @Override
+    public boolean isInRange(String instant) {
+      // && HoodieTimeline.compareTimestamps(instant, 
HoodieTimeline.LESSER_THAN_OR_EQUALS, endInstant)

Review comment:
       `LESSER_THAN_OR_EQUALS `?

##########
File path: 
hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source;
+
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.operator.FlinkOptions;
+import org.apache.hudi.operator.StreamReadMonitoringFunction;
+import org.apache.hudi.operator.utils.TestConfigurations;
+import org.apache.hudi.operator.utils.TestData;
+import org.apache.hudi.source.format.mor.MergeOnReadInputSplit;
+import org.apache.hudi.util.StreamerUtil;
+import org.apache.hudi.utils.TestUtils;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.StreamSource;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test cases for {@link StreamReadMonitoringFunction}.
+ */
+public class TestStreamReadMonitoringFunction {
+  private static final long WAIT_TIME_MILLIS = 5 * 1000L;
+
+  private Configuration conf;
+
+  @TempDir
+  File tempFile;
+
+  @BeforeEach
+  public void before() throws Exception {
+    final String basePath = tempFile.getAbsolutePath();
+    conf = TestConfigurations.getDefaultConf(basePath);
+    conf.setString(FlinkOptions.TABLE_TYPE, 
FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
+    conf.setInteger(FlinkOptions.STREAMING_CHECK_INTERVAL, 2); // check every 
2 seconds
+
+    StreamerUtil.initTableIfNotExists(conf);
+  }
+
+  @Test
+  public void testConsumeFromLatestCommit() throws Exception {
+    TestData.writeData(TestData.DATA_SET_ONE, conf);
+    StreamReadMonitoringFunction function = getMonitorFunc();
+    try (AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> harness = 
createHarness(function)) {
+      harness.setup();
+      harness.open();
+
+      CountDownLatch latch = new CountDownLatch(4);
+      TestSourceContext sourceContext = new TestSourceContext(latch);
+
+      runAsync(sourceContext, function);
+
+      assertTrue(latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should 
finish splits generation");
+      assertThat("Should produce the expected splits",
+          sourceContext.getPartitionPaths(), is("par1,par2,par3,par4"));
+      assertTrue(sourceContext.splits.stream().noneMatch(split -> 
split.getInstantRange().isPresent()),
+          "No instants should have range limit");
+
+      Thread.sleep(1000L);
+
+      // reset the source context
+      latch = new CountDownLatch(4);
+      sourceContext.reset(latch);
+
+      // write another instant and validate
+      TestData.writeData(TestData.DATA_SET_TWO, conf);
+
+      assertTrue(latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should 
finish splits generation");
+      assertThat("Should produce the expected splits",
+          sourceContext.getPartitionPaths(), is("par1,par2,par3,par4"));
+      assertTrue(sourceContext.splits.stream().allMatch(split -> 
split.getInstantRange().isPresent()),
+          "All the instants should have range limit");
+
+      // Stop the stream task.
+      function.close();
+    }
+  }
+
+  @Test
+  public void testConsumeFromSpecifiedCommit() throws Exception {
+    // write 2 commits first, use the second commit time as the specified 
start instant,
+    // all the splits should come from the second commit.
+    TestData.writeData(TestData.DATA_SET_ONE, conf);
+    TestData.writeData(TestData.DATA_SET_TWO, conf);
+    String specifiedCommit = 
TestUtils.getLatestCommit(tempFile.getAbsolutePath());
+    conf.setString(FlinkOptions.STREAMING_START_COMMIT, specifiedCommit);
+    StreamReadMonitoringFunction function = getMonitorFunc();
+    try (AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> harness = 
createHarness(function)) {
+      harness.setup();
+      harness.open();
+
+      CountDownLatch latch = new CountDownLatch(4);
+      TestSourceContext sourceContext = new TestSourceContext(latch);
+
+      runAsync(sourceContext, function);
+
+      assertTrue(latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should 
finish splits generation");
+      assertThat("Should produce the expected splits",
+          sourceContext.getPartitionPaths(), is("par1,par2,par3,par4"));
+      assertTrue(sourceContext.splits.stream().allMatch(split -> 
split.getInstantRange().isPresent()),
+          "All the instants should have range limit");
+      assertTrue(sourceContext.splits.stream().allMatch(split -> 
split.getLatestCommit().equals(specifiedCommit)),
+          "All the splits should be with specified instant time");
+
+      // Stop the stream task.
+      function.close();
+    }
+  }
+
+  @Test
+  public void testCheckpointRestore() throws Exception {
+    TestData.writeData(TestData.DATA_SET_ONE, conf);
+
+    StreamReadMonitoringFunction function = getMonitorFunc();
+    OperatorSubtaskState state;
+    try (AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> harness = 
createHarness(function)) {
+      harness.setup();
+      harness.open();
+
+      CountDownLatch latch = new CountDownLatch(4);
+      TestSourceContext sourceContext = new TestSourceContext(latch);
+      runAsync(sourceContext, function);
+
+      assertTrue(latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should 
finish splits generation");
+      Thread.sleep(1000L);
+
+      state = harness.snapshot(1, 1);
+
+      // Stop the stream task.
+      function.close();
+
+      assertTrue(latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should 
finish splits generation");
+      assertThat("Should produce the expected splits",
+          sourceContext.getPartitionPaths(), is("par1,par2,par3,par4"));
+      assertTrue(sourceContext.splits.stream().noneMatch(split -> 
split.getInstantRange().isPresent()),
+          "No instants should have range limit");
+
+    }
+
+    TestData.writeData(TestData.DATA_SET_TWO, conf);
+    StreamReadMonitoringFunction function2 = getMonitorFunc();
+    try (AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> harness = 
createHarness(function2)) {
+      harness.setup();
+      // Recover to process the remaining snapshots.
+      harness.initializeState(state);
+      harness.open();
+
+      CountDownLatch latch = new CountDownLatch(4);
+      TestSourceContext sourceContext = new TestSourceContext(latch);
+      runAsync(sourceContext, function2);
+
+      // Stop the stream task.
+      function.close();
+
+      assertTrue(latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should 
finish splits generation");
+      assertThat("Should produce the expected splits",
+          sourceContext.getPartitionPaths(), is("par1,par2,par3,par4"));
+      assertTrue(sourceContext.splits.stream().allMatch(split -> 
split.getInstantRange().isPresent()),
+          "All the instants should have range limit");
+    }
+  }
+
+  private StreamReadMonitoringFunction getMonitorFunc() {

Review comment:
       Do we have the same method in `TestStreamReadOperator`?

##########
File path: 
hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java
##########
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source;
+
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.operator.FlinkOptions;
+import org.apache.hudi.operator.StreamReadMonitoringFunction;
+import org.apache.hudi.operator.StreamReadOperator;
+import org.apache.hudi.operator.utils.TestConfigurations;
+import org.apache.hudi.operator.utils.TestData;
+import org.apache.hudi.source.format.FilePathUtils;
+import org.apache.hudi.source.format.mor.MergeOnReadInputFormat;
+import org.apache.hudi.source.format.mor.MergeOnReadInputSplit;
+import org.apache.hudi.source.format.mor.MergeOnReadTableState;
+import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.StreamerUtil;
+import org.apache.hudi.utils.TestUtils;
+
+import org.apache.avro.Schema;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor;
+import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
+import 
org.apache.flink.streaming.runtime.tasks.mailbox.SteppingMailboxProcessor;
+import org.apache.flink.streaming.util.CollectingSourceContext;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test cases for {@link StreamReadOperator}.
+ */
+public class TestStreamReadOperator {
+  private Configuration conf;
+
+  @TempDir
+  File tempFile;
+
+  @BeforeEach
+  public void before() throws Exception {
+    final String basePath = tempFile.getAbsolutePath();
+    conf = TestConfigurations.getDefaultConf(basePath);
+    conf.setString(FlinkOptions.TABLE_TYPE, 
FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
+
+    StreamerUtil.initTableIfNotExists(conf);
+  }
+
+  @Test
+  void testWriteRecords() throws Exception {
+    TestData.writeData(TestData.DATA_SET_ONE, conf);
+    try (OneInputStreamOperatorTestHarness<MergeOnReadInputSplit, RowData> 
harness = createReader()) {
+      harness.setup();
+      harness.open();
+
+      SteppingMailboxProcessor processor = createLocalMailbox(harness);
+      StreamReadMonitoringFunction func = getMonitorFunc();
+
+      List<MergeOnReadInputSplit> splits = generateSplits(func);
+      assertThat("Should have 4 splits", splits.size(), is(4));
+      for (MergeOnReadInputSplit split : splits) {
+        // Process this element to enqueue to mail-box.
+        harness.processElement(split, -1);
+
+        // Run the mail-box once to read all records from the given split.
+        assertThat("Should process 1 split", processor.runMailboxStep());
+      }
+      // Assert the output has expected elements.
+      TestData.assertRowDataEquals(harness.extractOutputValues(), 
TestData.DATA_SET_ONE);
+
+      TestData.writeData(TestData.DATA_SET_TWO, conf);
+      final List<MergeOnReadInputSplit> splits2 = generateSplits(func);
+      assertThat("Should have 4 splits", splits2.size(), is(4));
+      for (MergeOnReadInputSplit split : splits2) {
+        // Process this element to enqueue to mail-box.
+        harness.processElement(split, -1);
+
+        // Run the mail-box once to read all records from the given split.
+        assertThat("Should processed 1 split", processor.runMailboxStep());
+      }
+      // The result sets behaves like append only: DATA_SET_ONE + DATA_SET_TWO
+      List<RowData> expected = new ArrayList<>(TestData.DATA_SET_ONE);
+      expected.addAll(TestData.DATA_SET_TWO);
+      TestData.assertRowDataEquals(harness.extractOutputValues(), expected);
+    }
+  }
+
+  @Test
+  public void testCheckpoint() throws Exception {
+    // Received emitted splits: split1, split2, split3, split4, checkpoint 
request is triggered
+    // when reading records from split1.
+    TestData.writeData(TestData.DATA_SET_ONE, conf);
+    long timestamp = 0;
+    try (OneInputStreamOperatorTestHarness<MergeOnReadInputSplit, RowData> 
harness = createReader()) {
+      harness.setup();
+      harness.open();
+
+      SteppingMailboxProcessor processor = createLocalMailbox(harness);
+      StreamReadMonitoringFunction func = getMonitorFunc();
+
+      List<MergeOnReadInputSplit> splits = generateSplits(func);
+      assertThat("Should have 4 splits", splits.size(), is(4));
+
+      harness.processElement(splits.get(0), ++timestamp);
+      harness.processElement(splits.get(1), ++timestamp);
+      harness.processElement(splits.get(2), ++timestamp);
+      harness.processElement(splits.get(3), ++timestamp);
+
+      // Trigger snapshot state, it will start to work once all records from 
split0 are read.
+      processor.getMainMailboxExecutor()
+          .execute(() -> harness.snapshot(1, 3), "Trigger snapshot");
+
+      assertTrue(processor.runMailboxStep(), "Should have processed the 
split0");
+      assertTrue(processor.runMailboxStep(), "Should have processed the 
snapshot state action");
+
+      assertThat(TestData.rowDataToString(harness.extractOutputValues()),
+          is("[id3,Julian,53,1970-01-01T00:00:00.003,par2, 
id4,Fabian,31,1970-01-01T00:00:00.004,par2]"));
+
+      // Read records from split1.
+      assertTrue(processor.runMailboxStep(), "Should have processed the 
split1");
+
+      // Read records from split2.
+      assertTrue(processor.runMailboxStep(), "Should have processed the 
split2");
+
+      // Read records from split3.
+      assertTrue(processor.runMailboxStep(), "Should have processed the 
split3");

Review comment:
       Can we use a loop to replace each invocation?

##########
File path: 
hudi-flink/src/main/java/org/apache/hudi/source/format/mor/InstantRange.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.format.mor;
+
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * A instant commits range used for incremental reader filtering.
+ */
+public abstract class InstantRange implements Serializable {
+  private static final long serialVersionUID = 1L;
+
+  protected final String startInstant;
+  protected final String endInstant;
+
+  public InstantRange(String startInstant, String endInstant) {
+    this.startInstant = Objects.requireNonNull(startInstant);
+    this.endInstant = Objects.requireNonNull(endInstant);
+  }
+
+  public static InstantRange getInstance(String startInstant, String 
endInstant, RangeType rangeType) {
+    switch (rangeType) {
+      case OPEN_CLOSE:
+        return new OpenCloseRange(startInstant, endInstant);
+      case CLOSE_CLOSE:
+        return new CloseCloseRange(startInstant, endInstant);
+      default:
+        throw new AssertionError();
+    }
+  }
+
+  public String getStartInstant() {
+    return startInstant;
+  }
+
+  public String getEndInstant() {
+    return endInstant;
+  }
+
+  public abstract boolean isInRange(String instant);
+
+  // -------------------------------------------------------------------------
+  //  Inner Class
+  // -------------------------------------------------------------------------
+
+  /**
+   * Represents a range type.
+   */
+  public enum RangeType {
+    OPEN_CLOSE, CLOSE_CLOSE;
+  }
+
+  private static class OpenCloseRange extends InstantRange {
+
+    public OpenCloseRange(String startInstant, String endInstant) {
+      super(startInstant, endInstant);
+    }
+
+    @Override
+    public boolean isInRange(String instant) {
+      // && HoodieTimeline.compareTimestamps(instant, 
HoodieTimeline.LESSER_THAN_OR_EQUALS, endInstant)
+      // the logic is ensured by the log scanner
+      return HoodieTimeline.compareTimestamps(instant, 
HoodieTimeline.GREATER_THAN, startInstant);
+    }
+  }
+
+  private static class CloseCloseRange extends InstantRange {
+
+    public CloseCloseRange(String startInstant, String endInstant) {
+      super(startInstant, endInstant);
+    }
+
+    @Override
+    public boolean isInRange(String instant) {
+      // && HoodieTimeline.compareTimestamps(instant, 
HoodieTimeline.LESSER_THAN_OR_EQUALS, endInstant)

Review comment:
       ditto




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to