danny0405 commented on a change in pull request #3368:
URL: https://github.com/apache/hudi/pull/3368#discussion_r679085726



##########
File path: 
hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
##########
@@ -118,6 +123,33 @@ void testStreamWriteAndRead(HoodieTableType tableType) 
throws Exception {
     assertRowsEquals(rows2, TestData.DATA_SET_SOURCE_INSERT);
   }
 
+  @ParameterizedTest
+  @EnumSource(value = HoodieTableType.class)
+  void testStreamWriteAndRead(HoodieTableType tableType) throws Exception {
+    // create filesystem table named source
+    String createSource = TestConfigurations.getFileSourceDDL("source");
+    streamTableEnv.executeSql(createSource);
+
+    Map<String, String> options = new HashMap<>();
+    options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
+    options.put(FlinkOptions.READ_AS_STREAMING.key(), "true");
+    options.put(FlinkOptions.TABLE_TYPE.key(), tableType.name());
+    String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", 
options);
+    streamTableEnv.executeSql(hoodieTableDDL);
+    String insertInto = "insert into t1 select * from source";
+    execInsertSql(streamTableEnv, insertInto);
+
+    // New when READ_STREAMING_START_COMMIT is not set, reading from latest 
commit instance
+    // instead of reading from beginning.
+    List<Row> rows = execSelectSql(streamTableEnv, "select * from t1", 10);

Review comment:
       => reading from latest commit instance

##########
File path: 
hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
##########
@@ -68,6 +68,36 @@ public void before() throws Exception {
     StreamerUtil.initTableIfNotExists(conf);
   }
 
+  @Test
+  public void testConsume() throws Exception {
+    // write 2 commits first, and all the splits should come from the second 
commit.

Review comment:
       testConsumeFromLatestCommit

##########
File path: 
hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
##########
@@ -203,9 +203,11 @@ public void 
monitorDirAndForwardSplits(SourceContext<MergeOnReadInputSplit> cont
         instantRange = InstantRange.getInstance(specifiedStart, 
instantToIssue.getTimestamp(),
             InstantRange.RangeType.CLOSE_CLOSE);
       } else {
-        // first time consume and no start commit,
-        // would consume all the snapshot data PLUS incremental data set
-        instantRange = null;
+        // first time consume and no start commit. Change to consume the 
incremental data set
+        // of latest commit instead of all the snapshot data PLUS incremental 
data set.

Review comment:
       first time consume and no start commit,
   consumes the latest incremental data set

##########
File path: 
hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
##########
@@ -68,6 +68,36 @@ public void before() throws Exception {
     StreamerUtil.initTableIfNotExists(conf);
   }
 
+  @Test
+  public void testConsume() throws Exception {
+    // write 2 commits first, and all the splits should come from the second 
commit.
+    TestData.writeData(TestData.DATA_SET_INSERT, conf);
+    TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, conf);
+    StreamReadMonitoringFunction function = TestUtils.getMonitorFunc(conf);
+    try (AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> harness = 
createHarness(function)) {
+      harness.setup();
+      harness.open();
+
+      CountDownLatch latch = new CountDownLatch(4);
+      CollectingSourceContext sourceContext = new 
CollectingSourceContext(latch);
+
+      runAsync(sourceContext, function);
+
+      assertTrue(latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should 
finish splits generation");
+      assertThat("Should produce the expected splits",
+          sourceContext.getPartitionPaths(), is("par1,par2,par3,par4"));
+
+      assertTrue(sourceContext.splits.stream().allMatch(split -> 
split.getInstantRange().isPresent()),
+          "All the instants should have range limit");
+      String latestCommit = 
TestUtils.getLatestCommit(tempFile.getAbsolutePath());
+      assertTrue(sourceContext.splits.stream().allMatch(split -> 
split.getLatestCommit().equals(latestCommit)),
+          "All the splits should be with latestCommit instant time");
+
+      // Stop the stream task.
+      function.close();
+    }
+  }
+
   @Test
   public void testConsumeFromLatestCommit() throws Exception {
     TestData.writeData(TestData.DATA_SET_INSERT, conf);

Review comment:
       testConsumeFromLatestCommit => testConsumeFromLastCommit




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to