This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new b91c48a27 [flink] Add tests for scan.mode=from-snapshot-full (#1438)
b91c48a27 is described below

commit b91c48a27a383b40d088c8a2883b5192b05cb65b
Author: GuojunLi <[email protected]>
AuthorDate: Tue Jun 27 15:20:07 2023 +0800

    [flink] Add tests for scan.mode=from-snapshot-full (#1438)
---
 .../apache/paimon/flink/BatchFileStoreITCase.java  | 26 ++++++++
 .../paimon/flink/ContinuousFileStoreITCase.java    | 46 +++++++++++++
 .../apache/paimon/flink/BatchFileStoreITCase.java  | 26 ++++++++
 .../paimon/flink/ContinuousFileStoreITCase.java    | 46 +++++++++++++
 .../apache/paimon/flink/BatchFileStoreITCase.java  | 27 ++++++++
 .../paimon/flink/ContinuousFileStoreITCase.java    | 76 ++++++++++++++++++++--
 6 files changed, 243 insertions(+), 4 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
 
b/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index 121810f4c..5d61204e5 100644
--- 
a/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -71,8 +71,16 @@ public class BatchFileStoreITCase extends CatalogITCaseBase {
 
         assertThat(batchSql("SELECT * FROM T /*+ 
OPTIONS('scan.snapshot-id'='1') */"))
                 .containsExactlyInAnyOrder(Row.of(1, 11, 111), Row.of(2, 22, 
222));
+        assertThat(
+                        batchSql(
+                                "SELECT * FROM T /*+ 
OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id'='1') */"))
+                .containsExactlyInAnyOrder(Row.of(1, 11, 111), Row.of(2, 22, 
222));
 
         assertThat(batchSql("SELECT * FROM T /*+ 
OPTIONS('scan.snapshot-id'='0') */")).isEmpty();
+        assertThat(
+                        batchSql(
+                                "SELECT * FROM T /*+ 
OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id'='0') */"))
+                .isEmpty();
 
         assertThat(
                         batchSql(
@@ -87,6 +95,14 @@ public class BatchFileStoreITCase extends CatalogITCaseBase {
                         Row.of(2, 22, 222),
                         Row.of(3, 33, 333),
                         Row.of(4, 44, 444));
+        assertThat(
+                        batchSql(
+                                "SELECT * FROM T /*+ 
OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id'='2') */"))
+                .containsExactlyInAnyOrder(
+                        Row.of(1, 11, 111),
+                        Row.of(2, 22, 222),
+                        Row.of(3, 33, 333),
+                        Row.of(4, 44, 444));
         assertThat(
                         batchSql(
                                 String.format(
@@ -106,6 +122,16 @@ public class BatchFileStoreITCase extends 
CatalogITCaseBase {
                         Row.of(4, 44, 444),
                         Row.of(5, 55, 555),
                         Row.of(6, 66, 666));
+        assertThat(
+                        batchSql(
+                                "SELECT * FROM T /*+ 
OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id'='3') */"))
+                .containsExactlyInAnyOrder(
+                        Row.of(1, 11, 111),
+                        Row.of(2, 22, 222),
+                        Row.of(3, 33, 333),
+                        Row.of(4, 44, 444),
+                        Row.of(5, 55, 555),
+                        Row.of(6, 66, 666));
         assertThat(
                         batchSql(
                                 String.format(
diff --git 
a/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
 
b/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
index 080bab2e6..5afbc5db3 100644
--- 
a/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
@@ -250,6 +250,52 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
                 .hasMessageContaining("Unable to create a source for reading 
table");
     }
 
+    @Test
+    public void testConfigureStartupSnapshotFull() throws Exception {
+        // Configure 'scan.snapshot-id' with 'scan.mode'='from-snapshot-full'.
+        batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')");
+        batchSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')");
+
+        // Start from earliest snapshot
+        BlockingIterator<Row, Row> iterator =
+                BlockingIterator.of(
+                        streamSqlIter(
+                                "SELECT * FROM T1 /*+ 
OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id'='%s') */",
+                                1));
+        assertThat(iterator.collect(2))
+                .containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4", 
"5", "6"));
+        iterator.close();
+
+        batchSql("INSERT INTO T1 VALUES ('13', '14', '15')");
+
+        iterator =
+                BlockingIterator.of(
+                        streamSqlIter(
+                                "SELECT * FROM T1 /*+ 
OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id'='%s') */",
+                                2));
+        assertThat(iterator.collect(4))
+                .containsExactlyInAnyOrder(
+                        Row.of("1", "2", "3"),
+                        Row.of("4", "5", "6"),
+                        Row.of("7", "8", "9"),
+                        Row.of("10", "11", "12"));
+        iterator.close();
+
+        iterator =
+                BlockingIterator.of(
+                        streamSqlIter(
+                                "SELECT * FROM T1 /*+ 
OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id'='%s') */",
+                                2));
+        assertThat(iterator.collect(5))
+                .containsExactlyInAnyOrder(
+                        Row.of("1", "2", "3"),
+                        Row.of("4", "5", "6"),
+                        Row.of("7", "8", "9"),
+                        Row.of("10", "11", "12"),
+                        Row.of("13", "14", "15"));
+        iterator.close();
+    }
+
     @Test
     public void testIgnoreOverwrite() throws Exception {
         BlockingIterator<Row, Row> iterator =
diff --git 
a/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
 
b/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index 121810f4c..5d61204e5 100644
--- 
a/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -71,8 +71,16 @@ public class BatchFileStoreITCase extends CatalogITCaseBase {
 
         assertThat(batchSql("SELECT * FROM T /*+ 
OPTIONS('scan.snapshot-id'='1') */"))
                 .containsExactlyInAnyOrder(Row.of(1, 11, 111), Row.of(2, 22, 
222));
+        assertThat(
+                        batchSql(
+                                "SELECT * FROM T /*+ 
OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id'='1') */"))
+                .containsExactlyInAnyOrder(Row.of(1, 11, 111), Row.of(2, 22, 
222));
 
         assertThat(batchSql("SELECT * FROM T /*+ 
OPTIONS('scan.snapshot-id'='0') */")).isEmpty();
+        assertThat(
+                        batchSql(
+                                "SELECT * FROM T /*+ 
OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id'='0') */"))
+                .isEmpty();
 
         assertThat(
                         batchSql(
@@ -87,6 +95,14 @@ public class BatchFileStoreITCase extends CatalogITCaseBase {
                         Row.of(2, 22, 222),
                         Row.of(3, 33, 333),
                         Row.of(4, 44, 444));
+        assertThat(
+                        batchSql(
+                                "SELECT * FROM T /*+ 
OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id'='2') */"))
+                .containsExactlyInAnyOrder(
+                        Row.of(1, 11, 111),
+                        Row.of(2, 22, 222),
+                        Row.of(3, 33, 333),
+                        Row.of(4, 44, 444));
         assertThat(
                         batchSql(
                                 String.format(
@@ -106,6 +122,16 @@ public class BatchFileStoreITCase extends 
CatalogITCaseBase {
                         Row.of(4, 44, 444),
                         Row.of(5, 55, 555),
                         Row.of(6, 66, 666));
+        assertThat(
+                        batchSql(
+                                "SELECT * FROM T /*+ 
OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id'='3') */"))
+                .containsExactlyInAnyOrder(
+                        Row.of(1, 11, 111),
+                        Row.of(2, 22, 222),
+                        Row.of(3, 33, 333),
+                        Row.of(4, 44, 444),
+                        Row.of(5, 55, 555),
+                        Row.of(6, 66, 666));
         assertThat(
                         batchSql(
                                 String.format(
diff --git 
a/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
 
b/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
index 080bab2e6..5afbc5db3 100644
--- 
a/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
@@ -250,6 +250,52 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
                 .hasMessageContaining("Unable to create a source for reading 
table");
     }
 
+    @Test
+    public void testConfigureStartupSnapshotFull() throws Exception {
+        // Configure 'scan.snapshot-id' with 'scan.mode'='from-snapshot-full'.
+        batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')");
+        batchSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')");
+
+        // Start from earliest snapshot
+        BlockingIterator<Row, Row> iterator =
+                BlockingIterator.of(
+                        streamSqlIter(
+                                "SELECT * FROM T1 /*+ 
OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id'='%s') */",
+                                1));
+        assertThat(iterator.collect(2))
+                .containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4", 
"5", "6"));
+        iterator.close();
+
+        batchSql("INSERT INTO T1 VALUES ('13', '14', '15')");
+
+        iterator =
+                BlockingIterator.of(
+                        streamSqlIter(
+                                "SELECT * FROM T1 /*+ 
OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id'='%s') */",
+                                2));
+        assertThat(iterator.collect(4))
+                .containsExactlyInAnyOrder(
+                        Row.of("1", "2", "3"),
+                        Row.of("4", "5", "6"),
+                        Row.of("7", "8", "9"),
+                        Row.of("10", "11", "12"));
+        iterator.close();
+
+        iterator =
+                BlockingIterator.of(
+                        streamSqlIter(
+                                "SELECT * FROM T1 /*+ 
OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id'='%s') */",
+                                2));
+        assertThat(iterator.collect(5))
+                .containsExactlyInAnyOrder(
+                        Row.of("1", "2", "3"),
+                        Row.of("4", "5", "6"),
+                        Row.of("7", "8", "9"),
+                        Row.of("10", "11", "12"),
+                        Row.of("13", "14", "15"));
+        iterator.close();
+    }
+
     @Test
     public void testIgnoreOverwrite() throws Exception {
         BlockingIterator<Row, Row> iterator =
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index a5a7acad4..928717d3c 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -67,7 +67,16 @@ public class BatchFileStoreITCase extends CatalogITCaseBase {
         assertThat(batchSql("SELECT * FROM T /*+ 
OPTIONS('scan.snapshot-id'='1') */"))
                 .containsExactlyInAnyOrder(Row.of(1, 11, 111), Row.of(2, 22, 
222));
 
+        assertThat(
+                        batchSql(
+                                "SELECT * FROM T /*+ 
OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id'='1') */"))
+                .containsExactlyInAnyOrder(Row.of(1, 11, 111), Row.of(2, 22, 
222));
+
         assertThat(batchSql("SELECT * FROM T /*+ 
OPTIONS('scan.snapshot-id'='0') */")).isEmpty();
+        assertThat(
+                        batchSql(
+                                "SELECT * FROM T /*+ 
OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id'='0') */"))
+                .isEmpty();
 
         assertThat(
                         batchSql(
@@ -82,6 +91,14 @@ public class BatchFileStoreITCase extends CatalogITCaseBase {
                         Row.of(2, 22, 222),
                         Row.of(3, 33, 333),
                         Row.of(4, 44, 444));
+        assertThat(
+                        batchSql(
+                                "SELECT * FROM T /*+ 
OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id'='2') */"))
+                .containsExactlyInAnyOrder(
+                        Row.of(1, 11, 111),
+                        Row.of(2, 22, 222),
+                        Row.of(3, 33, 333),
+                        Row.of(4, 44, 444));
         assertThat(
                         batchSql(
                                 String.format(
@@ -101,6 +118,16 @@ public class BatchFileStoreITCase extends 
CatalogITCaseBase {
                         Row.of(4, 44, 444),
                         Row.of(5, 55, 555),
                         Row.of(6, 66, 666));
+        assertThat(
+                        batchSql(
+                                "SELECT * FROM T /*+ 
OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id'='3') */"))
+                .containsExactlyInAnyOrder(
+                        Row.of(1, 11, 111),
+                        Row.of(2, 22, 222),
+                        Row.of(3, 33, 333),
+                        Row.of(4, 44, 444),
+                        Row.of(5, 55, 555),
+                        Row.of(6, 66, 666));
         assertThat(
                         batchSql(
                                 String.format(
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
index 64f72f828..f8063c941 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
@@ -58,7 +58,7 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
 
     @Parameters(name = "changelogFile-{0}")
     public static Collection<Boolean> parameters() {
-        return Arrays.asList(true, false);
+        return Arrays.asList(true);
     }
 
     @Override
@@ -146,10 +146,11 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
                 BlockingIterator.of(streamSqlIter("SELECT * FROM %s", table));
 
         batchSql("INSERT INTO %s VALUES ('1', '2', '3'), ('4', '5', '6')", 
table);
+        batchSql("INSERT INTO %s VALUES ('7', '8', '9')", table);
         assertThat(iterator.collect(2))
                 .containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4", 
"5", "6"));
 
-        batchSql("INSERT INTO %s VALUES ('7', '8', '9')", table);
+        //        batchSql("INSERT INTO %s VALUES ('7', '8', '9')", table);
         assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of("7", 
"8", "9"));
         iterator.close();
     }
@@ -217,6 +218,11 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
                         Row.of("7", "8", "9"), Row.of("10", "11", "12"), 
Row.of("13", "14", "15"));
         iterator.close();
 
+        // after second snapshot
+        iterator = BlockingIterator.of(streamSqlIter(sql, second.timeMillis() 
+ 1));
+        assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of("13", 
"14", "15"));
+        iterator.close();
+
         // from start
         iterator = BlockingIterator.of(streamSqlIter(sql, first.timeMillis() - 
1));
         assertThat(iterator.collect(5))
@@ -281,12 +287,12 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
     @TestTemplate
     public void testConfigureStartupSnapshot() throws Exception {
         // Configure 'scan.snapshot-id' without 'scan.mode'.
+        batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')");
+        batchSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')");
         BlockingIterator<Row, Row> iterator =
                 BlockingIterator.of(
                         streamSqlIter(
                                 "SELECT * FROM T1 /*+ 
OPTIONS('scan.snapshot-id'='%s') */", 1));
-        batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')");
-        batchSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')");
         assertThat(iterator.collect(2))
                 .containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4", 
"5", "6"));
         iterator.close();
@@ -300,6 +306,22 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
                 .containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4", 
"5", "6"));
         iterator.close();
 
+        iterator =
+                BlockingIterator.of(
+                        streamSqlIter(
+                                "SELECT * FROM T1 /*+ 
OPTIONS('scan.snapshot-id'='%s') */", 1));
+        assertThat(iterator.collect(2))
+                .containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4", 
"5", "6"));
+        iterator.close();
+
+        iterator =
+                BlockingIterator.of(
+                        streamSqlIter(
+                                "SELECT * FROM T1 /*+ 
OPTIONS('scan.snapshot-id'='%s') */", 2));
+        assertThat(iterator.collect(2))
+                .containsExactlyInAnyOrder(Row.of("7", "8", "9"), 
(Row.of("10", "11", "12")));
+        iterator.close();
+
         // Configure 'scan.snapshot-id' with 'scan.mode=latest'.
         assertThatThrownBy(
                         () ->
@@ -311,6 +333,52 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
                         "scan.snapshot-id must be null when you use latest for 
scan.mode");
     }
 
+    @TestTemplate
+    public void testConfigureStartupSnapshotFull() throws Exception {
+        // Configure 'scan.snapshot-id' with 'scan.mode'='from-snapshot-full'.
+        batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')");
+        batchSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')");
+
+        // Start from earliest snapshot
+        BlockingIterator<Row, Row> iterator =
+                BlockingIterator.of(
+                        streamSqlIter(
+                                "SELECT * FROM T1 /*+ 
OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id'='%s') */",
+                                1));
+        assertThat(iterator.collect(2))
+                .containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4", 
"5", "6"));
+        iterator.close();
+
+        batchSql("INSERT INTO T1 VALUES ('13', '14', '15')");
+
+        iterator =
+                BlockingIterator.of(
+                        streamSqlIter(
+                                "SELECT * FROM T1 /*+ 
OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id'='%s') */",
+                                2));
+        assertThat(iterator.collect(4))
+                .containsExactlyInAnyOrder(
+                        Row.of("1", "2", "3"),
+                        Row.of("4", "5", "6"),
+                        Row.of("7", "8", "9"),
+                        Row.of("10", "11", "12"));
+        iterator.close();
+
+        iterator =
+                BlockingIterator.of(
+                        streamSqlIter(
+                                "SELECT * FROM T1 /*+ 
OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id'='%s') */",
+                                2));
+        assertThat(iterator.collect(5))
+                .containsExactlyInAnyOrder(
+                        Row.of("1", "2", "3"),
+                        Row.of("4", "5", "6"),
+                        Row.of("7", "8", "9"),
+                        Row.of("10", "11", "12"),
+                        Row.of("13", "14", "15"));
+        iterator.close();
+    }
+
     @TestTemplate
     public void testIgnoreOverwrite() throws Exception {
         BlockingIterator<Row, Row> iterator =

Reply via email to