This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 8b19ec9  [HUDI-2252] Default consumes from the latest instant for 
flink streaming reader (#3368)
8b19ec9 is described below

commit 8b19ec9ca07070a9819502e82091dd14d559ef94
Author: swuferhong <[email protected]>
AuthorDate: Fri Jul 30 14:25:05 2021 +0800

    [HUDI-2252] Default consumes from the latest instant for flink streaming 
reader (#3368)
---
 .../hudi/source/StreamReadMonitoringFunction.java  |  7 ++--
 .../source/TestStreamReadMonitoringFunction.java   | 38 +++++++++++++++++++---
 .../apache/hudi/table/HoodieDataSourceITCase.java  | 33 ++++++++++++++++++-
 .../test/java/org/apache/hudi/utils/TestData.java  | 12 +++++++
 4 files changed, 82 insertions(+), 8 deletions(-)

diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
 
b/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
index 983c19f..92c06e9 100644
--- 
a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
@@ -203,9 +203,10 @@ public class StreamReadMonitoringFunction
         instantRange = InstantRange.getInstance(specifiedStart, 
instantToIssue.getTimestamp(),
             InstantRange.RangeType.CLOSE_CLOSE);
       } else {
-        // first time consume and no start commit,
-        // would consume all the snapshot data PLUS incremental data set
-        instantRange = null;
+        // first time consume and no start commit, consumes the latest 
incremental data set.
+        HoodieInstant latestCommitInstant = 
metaClient.getCommitsTimeline().filterCompletedInstants().lastInstant().get();
+        instantRange = 
InstantRange.getInstance(latestCommitInstant.getTimestamp(), 
instantToIssue.getTimestamp(),
+            InstantRange.RangeType.CLOSE_CLOSE);
       }
     } else {
       LOG.info("No new instant found for the table under path " + path + ", 
skip reading");
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
 
b/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
index 406da32..f145744 100644
--- 
a/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
+++ 
b/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
@@ -70,7 +70,9 @@ public class TestStreamReadMonitoringFunction {
 
   @Test
   public void testConsumeFromLatestCommit() throws Exception {
+    // write 2 commits first, and all the splits should come from the second 
commit.
     TestData.writeData(TestData.DATA_SET_INSERT, conf);
+    TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, conf);
     StreamReadMonitoringFunction function = TestUtils.getMonitorFunc(conf);
     try (AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> harness = 
createHarness(function)) {
       harness.setup();
@@ -84,8 +86,36 @@ public class TestStreamReadMonitoringFunction {
       assertTrue(latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should 
finish splits generation");
       assertThat("Should produce the expected splits",
           sourceContext.getPartitionPaths(), is("par1,par2,par3,par4"));
-      assertTrue(sourceContext.splits.stream().noneMatch(split -> 
split.getInstantRange().isPresent()),
-          "No instants should have range limit");
+
+      assertTrue(sourceContext.splits.stream().allMatch(split -> 
split.getInstantRange().isPresent()),
+          "All the instants should have range limit");
+      String latestCommit = 
TestUtils.getLatestCommit(tempFile.getAbsolutePath());
+      assertTrue(sourceContext.splits.stream().allMatch(split -> 
split.getLatestCommit().equals(latestCommit)),
+          "All the splits should be with latestCommit instant time");
+
+      // Stop the stream task.
+      function.close();
+    }
+  }
+
+  @Test
+  public void testConsumeFromLastCommit() throws Exception {
+    TestData.writeData(TestData.DATA_SET_INSERT, conf);
+    StreamReadMonitoringFunction function = TestUtils.getMonitorFunc(conf);
+    try (AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> harness = 
createHarness(function)) {
+      harness.setup();
+      harness.open();
+
+      CountDownLatch latch = new CountDownLatch(4);
+      CollectingSourceContext sourceContext = new 
CollectingSourceContext(latch);
+
+      runAsync(sourceContext, function);
+
+      assertTrue(latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should 
finish splits generation");
+      assertThat("Should produce the expected splits",
+          sourceContext.getPartitionPaths(), is("par1,par2,par3,par4"));
+      assertTrue(sourceContext.splits.stream().allMatch(split -> 
split.getInstantRange().isPresent()),
+          "All instants should have range limit");
 
       Thread.sleep(1000L);
 
@@ -163,8 +193,8 @@ public class TestStreamReadMonitoringFunction {
       assertTrue(latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should 
finish splits generation");
       assertThat("Should produce the expected splits",
           sourceContext.getPartitionPaths(), is("par1,par2,par3,par4"));
-      assertTrue(sourceContext.splits.stream().noneMatch(split -> 
split.getInstantRange().isPresent()),
-          "No instants should have range limit");
+      assertTrue(sourceContext.splits.stream().allMatch(split -> 
split.getInstantRange().isPresent()),
+          "All instants should have range limit");
 
     }
 
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java 
b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
index 1ddcb74..c1813dc 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
@@ -95,7 +95,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
 
   @ParameterizedTest
   @EnumSource(value = HoodieTableType.class)
-  void testStreamWriteAndRead(HoodieTableType tableType) throws Exception {
+  void testStreamWriteAndReadFromSpecifiedCommit(HoodieTableType tableType) 
throws Exception {
     // create filesystem table named source
     String createSource = TestConfigurations.getFileSourceDDL("source");
     streamTableEnv.executeSql(createSource);
@@ -109,6 +109,11 @@ public class HoodieDataSourceITCase extends 
AbstractTestBase {
     String insertInto = "insert into t1 select * from source";
     execInsertSql(streamTableEnv, insertInto);
 
+    String firstCommit = TestUtils.getFirstCommit(tempFile.getAbsolutePath());
+    options.put(FlinkOptions.READ_STREAMING_START_COMMIT.key(), firstCommit);
+    streamTableEnv.executeSql("drop table t1");
+    hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options);
+    streamTableEnv.executeSql(hoodieTableDDL);
     List<Row> rows = execSelectSql(streamTableEnv, "select * from t1", 10);
     assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT);
 
@@ -120,6 +125,32 @@ public class HoodieDataSourceITCase extends 
AbstractTestBase {
 
   @ParameterizedTest
   @EnumSource(value = HoodieTableType.class)
+  void testStreamWriteAndRead(HoodieTableType tableType) throws Exception {
+    // create filesystem table named source
+    String createSource = TestConfigurations.getFileSourceDDL("source");
+    streamTableEnv.executeSql(createSource);
+
+    Map<String, String> options = new HashMap<>();
+    options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
+    options.put(FlinkOptions.READ_AS_STREAMING.key(), "true");
+    options.put(FlinkOptions.TABLE_TYPE.key(), tableType.name());
+    String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", 
options);
+    streamTableEnv.executeSql(hoodieTableDDL);
+    String insertInto = "insert into t1 select * from source";
+    execInsertSql(streamTableEnv, insertInto);
+
+    // reading from latest commit instance.
+    List<Row> rows = execSelectSql(streamTableEnv, "select * from t1", 10);
+    assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT_LATEST_COMMIT);
+
+    // insert another batch of data
+    execInsertSql(streamTableEnv, insertInto);
+    List<Row> rows2 = execSelectSql(streamTableEnv, "select * from t1", 10);
+    assertRowsEquals(rows2, TestData.DATA_SET_SOURCE_INSERT_LATEST_COMMIT);
+  }
+
+  @ParameterizedTest
+  @EnumSource(value = HoodieTableType.class)
   void testStreamReadAppendData(HoodieTableType tableType) throws Exception {
     // create filesystem table named source
     String createSource = TestConfigurations.getFileSourceDDL("source");
diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java 
b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index b85c35b..5ddb99c 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -160,6 +160,18 @@ public class TestData {
           TimestampData.fromEpochMillis(8000), StringData.fromString("par4"))
   );
 
+  // data set of test_source.data latest commit.
+  public static List<RowData> DATA_SET_SOURCE_INSERT_LATEST_COMMIT = 
Arrays.asList(
+      insertRow(StringData.fromString("id5"), StringData.fromString("Sophia"), 
18,
+          TimestampData.fromEpochMillis(5000), StringData.fromString("par3")),
+      insertRow(StringData.fromString("id6"), StringData.fromString("Emma"), 
20,
+          TimestampData.fromEpochMillis(6000), StringData.fromString("par3")),
+      insertRow(StringData.fromString("id7"), StringData.fromString("Bob"), 44,
+          TimestampData.fromEpochMillis(7000), StringData.fromString("par4")),
+      insertRow(StringData.fromString("id8"), StringData.fromString("Han"), 56,
+          TimestampData.fromEpochMillis(8000), StringData.fromString("par4"))
+  );
+
   // merged data set of test_source.data and test_source_2.data
   public static List<RowData> DATA_SET_SOURCE_MERGED = Arrays.asList(
       insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 
24,

Reply via email to