This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new b91c48a27 [flink] Add tests for scan.mode=from-snapshot-full (#1438)
b91c48a27 is described below
commit b91c48a27a383b40d088c8a2883b5192b05cb65b
Author: GuojunLi <[email protected]>
AuthorDate: Tue Jun 27 15:20:07 2023 +0800
[flink] Add tests for scan.mode=from-snapshot-full (#1438)
---
.../apache/paimon/flink/BatchFileStoreITCase.java | 26 ++++++++
.../paimon/flink/ContinuousFileStoreITCase.java | 46 +++++++++++++
.../apache/paimon/flink/BatchFileStoreITCase.java | 26 ++++++++
.../paimon/flink/ContinuousFileStoreITCase.java | 46 +++++++++++++
.../apache/paimon/flink/BatchFileStoreITCase.java | 27 ++++++++
.../paimon/flink/ContinuousFileStoreITCase.java | 76 ++++++++++++++++++++--
6 files changed, 243 insertions(+), 4 deletions(-)
diff --git
a/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
b/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index 121810f4c..5d61204e5 100644
---
a/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++
b/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -71,8 +71,16 @@ public class BatchFileStoreITCase extends CatalogITCaseBase {
assertThat(batchSql("SELECT * FROM T /*+
OPTIONS('scan.snapshot-id'='1') */"))
.containsExactlyInAnyOrder(Row.of(1, 11, 111), Row.of(2, 22,
222));
+ assertThat(
+ batchSql(
+ "SELECT * FROM T /*+
OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id'='1') */"))
+ .containsExactlyInAnyOrder(Row.of(1, 11, 111), Row.of(2, 22,
222));
assertThat(batchSql("SELECT * FROM T /*+
OPTIONS('scan.snapshot-id'='0') */")).isEmpty();
+ assertThat(
+ batchSql(
+ "SELECT * FROM T /*+
OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id'='0') */"))
+ .isEmpty();
assertThat(
batchSql(
@@ -87,6 +95,14 @@ public class BatchFileStoreITCase extends CatalogITCaseBase {
Row.of(2, 22, 222),
Row.of(3, 33, 333),
Row.of(4, 44, 444));
+ assertThat(
+ batchSql(
+ "SELECT * FROM T /*+
OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id'='2') */"))
+ .containsExactlyInAnyOrder(
+ Row.of(1, 11, 111),
+ Row.of(2, 22, 222),
+ Row.of(3, 33, 333),
+ Row.of(4, 44, 444));
assertThat(
batchSql(
String.format(
@@ -106,6 +122,16 @@ public class BatchFileStoreITCase extends
CatalogITCaseBase {
Row.of(4, 44, 444),
Row.of(5, 55, 555),
Row.of(6, 66, 666));
+ assertThat(
+ batchSql(
+ "SELECT * FROM T /*+
OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id'='3') */"))
+ .containsExactlyInAnyOrder(
+ Row.of(1, 11, 111),
+ Row.of(2, 22, 222),
+ Row.of(3, 33, 333),
+ Row.of(4, 44, 444),
+ Row.of(5, 55, 555),
+ Row.of(6, 66, 666));
assertThat(
batchSql(
String.format(
diff --git
a/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
b/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
index 080bab2e6..5afbc5db3 100644
---
a/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
+++
b/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
@@ -250,6 +250,52 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
.hasMessageContaining("Unable to create a source for reading
table");
}
+ @Test
+ public void testConfigureStartupSnapshotFull() throws Exception {
+ // Configure 'scan.snapshot-id' with 'scan.mode'='from-snapshot-full'.
+ batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')");
+ batchSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')");
+
+ // Start from earliest snapshot
+ BlockingIterator<Row, Row> iterator =
+ BlockingIterator.of(
+ streamSqlIter(
+ "SELECT * FROM T1 /*+
OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id'='%s') */",
+ 1));
+ assertThat(iterator.collect(2))
+ .containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4",
"5", "6"));
+ iterator.close();
+
+ batchSql("INSERT INTO T1 VALUES ('13', '14', '15')");
+
+ iterator =
+ BlockingIterator.of(
+ streamSqlIter(
+ "SELECT * FROM T1 /*+
OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id'='%s') */",
+ 2));
+ assertThat(iterator.collect(4))
+ .containsExactlyInAnyOrder(
+ Row.of("1", "2", "3"),
+ Row.of("4", "5", "6"),
+ Row.of("7", "8", "9"),
+ Row.of("10", "11", "12"));
+ iterator.close();
+
+ iterator =
+ BlockingIterator.of(
+ streamSqlIter(
+ "SELECT * FROM T1 /*+
OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id'='%s') */",
+ 2));
+ assertThat(iterator.collect(5))
+ .containsExactlyInAnyOrder(
+ Row.of("1", "2", "3"),
+ Row.of("4", "5", "6"),
+ Row.of("7", "8", "9"),
+ Row.of("10", "11", "12"),
+ Row.of("13", "14", "15"));
+ iterator.close();
+ }
+
@Test
public void testIgnoreOverwrite() throws Exception {
BlockingIterator<Row, Row> iterator =
diff --git
a/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
b/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index 121810f4c..5d61204e5 100644
---
a/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++
b/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -71,8 +71,16 @@ public class BatchFileStoreITCase extends CatalogITCaseBase {
assertThat(batchSql("SELECT * FROM T /*+
OPTIONS('scan.snapshot-id'='1') */"))
.containsExactlyInAnyOrder(Row.of(1, 11, 111), Row.of(2, 22,
222));
+ assertThat(
+ batchSql(
+ "SELECT * FROM T /*+
OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id'='1') */"))
+ .containsExactlyInAnyOrder(Row.of(1, 11, 111), Row.of(2, 22,
222));
assertThat(batchSql("SELECT * FROM T /*+
OPTIONS('scan.snapshot-id'='0') */")).isEmpty();
+ assertThat(
+ batchSql(
+ "SELECT * FROM T /*+
OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id'='0') */"))
+ .isEmpty();
assertThat(
batchSql(
@@ -87,6 +95,14 @@ public class BatchFileStoreITCase extends CatalogITCaseBase {
Row.of(2, 22, 222),
Row.of(3, 33, 333),
Row.of(4, 44, 444));
+ assertThat(
+ batchSql(
+ "SELECT * FROM T /*+
OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id'='2') */"))
+ .containsExactlyInAnyOrder(
+ Row.of(1, 11, 111),
+ Row.of(2, 22, 222),
+ Row.of(3, 33, 333),
+ Row.of(4, 44, 444));
assertThat(
batchSql(
String.format(
@@ -106,6 +122,16 @@ public class BatchFileStoreITCase extends
CatalogITCaseBase {
Row.of(4, 44, 444),
Row.of(5, 55, 555),
Row.of(6, 66, 666));
+ assertThat(
+ batchSql(
+ "SELECT * FROM T /*+
OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id'='3') */"))
+ .containsExactlyInAnyOrder(
+ Row.of(1, 11, 111),
+ Row.of(2, 22, 222),
+ Row.of(3, 33, 333),
+ Row.of(4, 44, 444),
+ Row.of(5, 55, 555),
+ Row.of(6, 66, 666));
assertThat(
batchSql(
String.format(
diff --git
a/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
b/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
index 080bab2e6..5afbc5db3 100644
---
a/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
+++
b/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
@@ -250,6 +250,52 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
.hasMessageContaining("Unable to create a source for reading
table");
}
+ @Test
+ public void testConfigureStartupSnapshotFull() throws Exception {
+ // Configure 'scan.snapshot-id' with 'scan.mode'='from-snapshot-full'.
+ batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')");
+ batchSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')");
+
+ // Start from earliest snapshot
+ BlockingIterator<Row, Row> iterator =
+ BlockingIterator.of(
+ streamSqlIter(
+ "SELECT * FROM T1 /*+
OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id'='%s') */",
+ 1));
+ assertThat(iterator.collect(2))
+ .containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4",
"5", "6"));
+ iterator.close();
+
+ batchSql("INSERT INTO T1 VALUES ('13', '14', '15')");
+
+ iterator =
+ BlockingIterator.of(
+ streamSqlIter(
+ "SELECT * FROM T1 /*+
OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id'='%s') */",
+ 2));
+ assertThat(iterator.collect(4))
+ .containsExactlyInAnyOrder(
+ Row.of("1", "2", "3"),
+ Row.of("4", "5", "6"),
+ Row.of("7", "8", "9"),
+ Row.of("10", "11", "12"));
+ iterator.close();
+
+ iterator =
+ BlockingIterator.of(
+ streamSqlIter(
+ "SELECT * FROM T1 /*+
OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id'='%s') */",
+ 2));
+ assertThat(iterator.collect(5))
+ .containsExactlyInAnyOrder(
+ Row.of("1", "2", "3"),
+ Row.of("4", "5", "6"),
+ Row.of("7", "8", "9"),
+ Row.of("10", "11", "12"),
+ Row.of("13", "14", "15"));
+ iterator.close();
+ }
+
@Test
public void testIgnoreOverwrite() throws Exception {
BlockingIterator<Row, Row> iterator =
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index a5a7acad4..928717d3c 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -67,7 +67,16 @@ public class BatchFileStoreITCase extends CatalogITCaseBase {
assertThat(batchSql("SELECT * FROM T /*+
OPTIONS('scan.snapshot-id'='1') */"))
.containsExactlyInAnyOrder(Row.of(1, 11, 111), Row.of(2, 22,
222));
+ assertThat(
+ batchSql(
+ "SELECT * FROM T /*+
OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id'='1') */"))
+ .containsExactlyInAnyOrder(Row.of(1, 11, 111), Row.of(2, 22,
222));
+
assertThat(batchSql("SELECT * FROM T /*+
OPTIONS('scan.snapshot-id'='0') */")).isEmpty();
+ assertThat(
+ batchSql(
+ "SELECT * FROM T /*+
OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id'='0') */"))
+ .isEmpty();
assertThat(
batchSql(
@@ -82,6 +91,14 @@ public class BatchFileStoreITCase extends CatalogITCaseBase {
Row.of(2, 22, 222),
Row.of(3, 33, 333),
Row.of(4, 44, 444));
+ assertThat(
+ batchSql(
+ "SELECT * FROM T /*+
OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id'='2') */"))
+ .containsExactlyInAnyOrder(
+ Row.of(1, 11, 111),
+ Row.of(2, 22, 222),
+ Row.of(3, 33, 333),
+ Row.of(4, 44, 444));
assertThat(
batchSql(
String.format(
@@ -101,6 +118,16 @@ public class BatchFileStoreITCase extends
CatalogITCaseBase {
Row.of(4, 44, 444),
Row.of(5, 55, 555),
Row.of(6, 66, 666));
+ assertThat(
+ batchSql(
+ "SELECT * FROM T /*+
OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id'='3') */"))
+ .containsExactlyInAnyOrder(
+ Row.of(1, 11, 111),
+ Row.of(2, 22, 222),
+ Row.of(3, 33, 333),
+ Row.of(4, 44, 444),
+ Row.of(5, 55, 555),
+ Row.of(6, 66, 666));
assertThat(
batchSql(
String.format(
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
index 64f72f828..f8063c941 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
@@ -58,7 +58,7 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
@Parameters(name = "changelogFile-{0}")
public static Collection<Boolean> parameters() {
- return Arrays.asList(true, false);
+ return Arrays.asList(true);
}
@Override
@@ -146,10 +146,11 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
BlockingIterator.of(streamSqlIter("SELECT * FROM %s", table));
batchSql("INSERT INTO %s VALUES ('1', '2', '3'), ('4', '5', '6')",
table);
+ batchSql("INSERT INTO %s VALUES ('7', '8', '9')", table);
assertThat(iterator.collect(2))
.containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4",
"5", "6"));
- batchSql("INSERT INTO %s VALUES ('7', '8', '9')", table);
+ // batchSql("INSERT INTO %s VALUES ('7', '8', '9')", table);
assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of("7",
"8", "9"));
iterator.close();
}
@@ -217,6 +218,11 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
Row.of("7", "8", "9"), Row.of("10", "11", "12"),
Row.of("13", "14", "15"));
iterator.close();
+ // after second snapshot
+ iterator = BlockingIterator.of(streamSqlIter(sql, second.timeMillis()
+ 1));
+ assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of("13",
"14", "15"));
+ iterator.close();
+
// from start
iterator = BlockingIterator.of(streamSqlIter(sql, first.timeMillis() -
1));
assertThat(iterator.collect(5))
@@ -281,12 +287,12 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
@TestTemplate
public void testConfigureStartupSnapshot() throws Exception {
// Configure 'scan.snapshot-id' without 'scan.mode'.
+ batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')");
+ batchSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')");
BlockingIterator<Row, Row> iterator =
BlockingIterator.of(
streamSqlIter(
"SELECT * FROM T1 /*+
OPTIONS('scan.snapshot-id'='%s') */", 1));
- batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')");
- batchSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')");
assertThat(iterator.collect(2))
.containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4",
"5", "6"));
iterator.close();
@@ -300,6 +306,22 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
.containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4",
"5", "6"));
iterator.close();
+ iterator =
+ BlockingIterator.of(
+ streamSqlIter(
+ "SELECT * FROM T1 /*+
OPTIONS('scan.snapshot-id'='%s') */", 1));
+ assertThat(iterator.collect(2))
+ .containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4",
"5", "6"));
+ iterator.close();
+
+ iterator =
+ BlockingIterator.of(
+ streamSqlIter(
+ "SELECT * FROM T1 /*+
OPTIONS('scan.snapshot-id'='%s') */", 2));
+ assertThat(iterator.collect(2))
+ .containsExactlyInAnyOrder(Row.of("7", "8", "9"),
(Row.of("10", "11", "12")));
+ iterator.close();
+
// Configure 'scan.snapshot-id' with 'scan.mode=latest'.
assertThatThrownBy(
() ->
@@ -311,6 +333,52 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
"scan.snapshot-id must be null when you use latest for
scan.mode");
}
+ @TestTemplate
+ public void testConfigureStartupSnapshotFull() throws Exception {
+ // Configure 'scan.snapshot-id' with 'scan.mode'='from-snapshot-full'.
+ batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')");
+ batchSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')");
+
+ // Start from earliest snapshot
+ BlockingIterator<Row, Row> iterator =
+ BlockingIterator.of(
+ streamSqlIter(
+ "SELECT * FROM T1 /*+
OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id'='%s') */",
+ 1));
+ assertThat(iterator.collect(2))
+ .containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4",
"5", "6"));
+ iterator.close();
+
+ batchSql("INSERT INTO T1 VALUES ('13', '14', '15')");
+
+ iterator =
+ BlockingIterator.of(
+ streamSqlIter(
+ "SELECT * FROM T1 /*+
OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id'='%s') */",
+ 2));
+ assertThat(iterator.collect(4))
+ .containsExactlyInAnyOrder(
+ Row.of("1", "2", "3"),
+ Row.of("4", "5", "6"),
+ Row.of("7", "8", "9"),
+ Row.of("10", "11", "12"));
+ iterator.close();
+
+ iterator =
+ BlockingIterator.of(
+ streamSqlIter(
+ "SELECT * FROM T1 /*+
OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id'='%s') */",
+ 2));
+ assertThat(iterator.collect(5))
+ .containsExactlyInAnyOrder(
+ Row.of("1", "2", "3"),
+ Row.of("4", "5", "6"),
+ Row.of("7", "8", "9"),
+ Row.of("10", "11", "12"),
+ Row.of("13", "14", "15"));
+ iterator.close();
+ }
+
@TestTemplate
public void testIgnoreOverwrite() throws Exception {
BlockingIterator<Row, Row> iterator =