This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 8b19ec9 [HUDI-2252] Default consumes from the latest instant for
flink streaming reader (#3368)
8b19ec9 is described below
commit 8b19ec9ca07070a9819502e82091dd14d559ef94
Author: swuferhong <[email protected]>
AuthorDate: Fri Jul 30 14:25:05 2021 +0800
[HUDI-2252] Default consumes from the latest instant for flink streaming
reader (#3368)
---
.../hudi/source/StreamReadMonitoringFunction.java | 7 ++--
.../source/TestStreamReadMonitoringFunction.java | 38 +++++++++++++++++++---
.../apache/hudi/table/HoodieDataSourceITCase.java | 33 ++++++++++++++++++-
.../test/java/org/apache/hudi/utils/TestData.java | 12 +++++++
4 files changed, 82 insertions(+), 8 deletions(-)
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
b/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
index 983c19f..92c06e9 100644
---
a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
+++
b/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
@@ -203,9 +203,10 @@ public class StreamReadMonitoringFunction
instantRange = InstantRange.getInstance(specifiedStart,
instantToIssue.getTimestamp(),
InstantRange.RangeType.CLOSE_CLOSE);
} else {
- // first time consume and no start commit,
- // would consume all the snapshot data PLUS incremental data set
- instantRange = null;
+ // first time consume and no start commit, consumes the latest
incremental data set.
+ HoodieInstant latestCommitInstant =
metaClient.getCommitsTimeline().filterCompletedInstants().lastInstant().get();
+ instantRange =
InstantRange.getInstance(latestCommitInstant.getTimestamp(),
instantToIssue.getTimestamp(),
+ InstantRange.RangeType.CLOSE_CLOSE);
}
} else {
LOG.info("No new instant found for the table under path " + path + ",
skip reading");
diff --git
a/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
b/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
index 406da32..f145744 100644
---
a/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
+++
b/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
@@ -70,7 +70,9 @@ public class TestStreamReadMonitoringFunction {
@Test
public void testConsumeFromLatestCommit() throws Exception {
+ // write 2 commits first, and all the splits should come from the second
commit.
TestData.writeData(TestData.DATA_SET_INSERT, conf);
+ TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, conf);
StreamReadMonitoringFunction function = TestUtils.getMonitorFunc(conf);
try (AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> harness =
createHarness(function)) {
harness.setup();
@@ -84,8 +86,36 @@ public class TestStreamReadMonitoringFunction {
assertTrue(latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should
finish splits generation");
assertThat("Should produce the expected splits",
sourceContext.getPartitionPaths(), is("par1,par2,par3,par4"));
- assertTrue(sourceContext.splits.stream().noneMatch(split ->
split.getInstantRange().isPresent()),
- "No instants should have range limit");
+
+ assertTrue(sourceContext.splits.stream().allMatch(split ->
split.getInstantRange().isPresent()),
+ "All the instants should have range limit");
+ String latestCommit =
TestUtils.getLatestCommit(tempFile.getAbsolutePath());
+ assertTrue(sourceContext.splits.stream().allMatch(split ->
split.getLatestCommit().equals(latestCommit)),
+ "All the splits should be with latestCommit instant time");
+
+ // Stop the stream task.
+ function.close();
+ }
+ }
+
+ @Test
+ public void testConsumeFromLastCommit() throws Exception {
+ TestData.writeData(TestData.DATA_SET_INSERT, conf);
+ StreamReadMonitoringFunction function = TestUtils.getMonitorFunc(conf);
+ try (AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> harness =
createHarness(function)) {
+ harness.setup();
+ harness.open();
+
+ CountDownLatch latch = new CountDownLatch(4);
+ CollectingSourceContext sourceContext = new
CollectingSourceContext(latch);
+
+ runAsync(sourceContext, function);
+
+ assertTrue(latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should
finish splits generation");
+ assertThat("Should produce the expected splits",
+ sourceContext.getPartitionPaths(), is("par1,par2,par3,par4"));
+ assertTrue(sourceContext.splits.stream().allMatch(split ->
split.getInstantRange().isPresent()),
+ "All instants should have range limit");
Thread.sleep(1000L);
@@ -163,8 +193,8 @@ public class TestStreamReadMonitoringFunction {
assertTrue(latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should
finish splits generation");
assertThat("Should produce the expected splits",
sourceContext.getPartitionPaths(), is("par1,par2,par3,par4"));
- assertTrue(sourceContext.splits.stream().noneMatch(split ->
split.getInstantRange().isPresent()),
- "No instants should have range limit");
+ assertTrue(sourceContext.splits.stream().allMatch(split ->
split.getInstantRange().isPresent()),
+ "All instants should have range limit");
}
diff --git
a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
index 1ddcb74..c1813dc 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
@@ -95,7 +95,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
@ParameterizedTest
@EnumSource(value = HoodieTableType.class)
- void testStreamWriteAndRead(HoodieTableType tableType) throws Exception {
+ void testStreamWriteAndReadFromSpecifiedCommit(HoodieTableType tableType)
throws Exception {
// create filesystem table named source
String createSource = TestConfigurations.getFileSourceDDL("source");
streamTableEnv.executeSql(createSource);
@@ -109,6 +109,11 @@ public class HoodieDataSourceITCase extends
AbstractTestBase {
String insertInto = "insert into t1 select * from source";
execInsertSql(streamTableEnv, insertInto);
+ String firstCommit = TestUtils.getFirstCommit(tempFile.getAbsolutePath());
+ options.put(FlinkOptions.READ_STREAMING_START_COMMIT.key(), firstCommit);
+ streamTableEnv.executeSql("drop table t1");
+ hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options);
+ streamTableEnv.executeSql(hoodieTableDDL);
List<Row> rows = execSelectSql(streamTableEnv, "select * from t1", 10);
assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT);
@@ -120,6 +125,32 @@ public class HoodieDataSourceITCase extends
AbstractTestBase {
@ParameterizedTest
@EnumSource(value = HoodieTableType.class)
+ void testStreamWriteAndRead(HoodieTableType tableType) throws Exception {
+ // create filesystem table named source
+ String createSource = TestConfigurations.getFileSourceDDL("source");
+ streamTableEnv.executeSql(createSource);
+
+ Map<String, String> options = new HashMap<>();
+ options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
+ options.put(FlinkOptions.READ_AS_STREAMING.key(), "true");
+ options.put(FlinkOptions.TABLE_TYPE.key(), tableType.name());
+ String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1",
options);
+ streamTableEnv.executeSql(hoodieTableDDL);
+ String insertInto = "insert into t1 select * from source";
+ execInsertSql(streamTableEnv, insertInto);
+
+ // reading from latest commit instance.
+ List<Row> rows = execSelectSql(streamTableEnv, "select * from t1", 10);
+ assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT_LATEST_COMMIT);
+
+ // insert another batch of data
+ execInsertSql(streamTableEnv, insertInto);
+ List<Row> rows2 = execSelectSql(streamTableEnv, "select * from t1", 10);
+ assertRowsEquals(rows2, TestData.DATA_SET_SOURCE_INSERT_LATEST_COMMIT);
+ }
+
+ @ParameterizedTest
+ @EnumSource(value = HoodieTableType.class)
void testStreamReadAppendData(HoodieTableType tableType) throws Exception {
// create filesystem table named source
String createSource = TestConfigurations.getFileSourceDDL("source");
diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index b85c35b..5ddb99c 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -160,6 +160,18 @@ public class TestData {
TimestampData.fromEpochMillis(8000), StringData.fromString("par4"))
);
+ // data set of test_source.data latest commit.
+ public static List<RowData> DATA_SET_SOURCE_INSERT_LATEST_COMMIT =
Arrays.asList(
+ insertRow(StringData.fromString("id5"), StringData.fromString("Sophia"),
18,
+ TimestampData.fromEpochMillis(5000), StringData.fromString("par3")),
+ insertRow(StringData.fromString("id6"), StringData.fromString("Emma"),
20,
+ TimestampData.fromEpochMillis(6000), StringData.fromString("par3")),
+ insertRow(StringData.fromString("id7"), StringData.fromString("Bob"), 44,
+ TimestampData.fromEpochMillis(7000), StringData.fromString("par4")),
+ insertRow(StringData.fromString("id8"), StringData.fromString("Han"), 56,
+ TimestampData.fromEpochMillis(8000), StringData.fromString("par4"))
+ );
+
// merged data set of test_source.data and test_source_2.data
public static List<RowData> DATA_SET_SOURCE_MERGED = Arrays.asList(
insertRow(StringData.fromString("id1"), StringData.fromString("Danny"),
24,