This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 22346fcde [flink] Continuous file store IT Cases with parameterized 
(#1436)
22346fcde is described below

commit 22346fcde75509dd900788d57a4cc3268dc71b0f
Author: GuojunLi <[email protected]>
AuthorDate: Mon Jun 26 23:18:17 2023 +0800

    [flink] Continuous file store IT Cases with parameterized (#1436)
---
 .../paimon/flink/ContinuousFileStoreITCase.java    | 259 +++------------------
 .../paimon/flink/ContinuousFileStoreITCase.java    | 259 +++------------------
 2 files changed, 56 insertions(+), 462 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
 
b/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
index ed601067f..080bab2e6 100644
--- 
a/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
@@ -27,9 +27,12 @@ import 
org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
 
 import org.apache.flink.types.Row;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Comparator;
 import java.util.List;
 
@@ -37,15 +40,26 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** SQL ITCase for continuous file store. */
+@RunWith(Parameterized.class)
 public class ContinuousFileStoreITCase extends CatalogITCaseBase {
 
     private boolean changelogFile;
 
-    public ContinuousFileStoreITCase() {}
+    public ContinuousFileStoreITCase(boolean changelogFile) {
+        this.changelogFile = changelogFile;
+    }
+
+    @Parameterized.Parameters(name = "changelogFile: {0}")
+    public static Collection<Boolean> parameters() {
+        return Arrays.asList(true, false);
+    }
 
     @Override
     protected List<String> ddl() {
-        String options = changelogFile ? " WITH('changelog-producer'='input')" 
: "";
+        String options =
+                changelogFile
+                        ? " 
WITH('write-mode'='change-log','changelog-producer'='input')"
+                        : "";
         return Arrays.asList(
                 "CREATE TABLE IF NOT EXISTS T1 (a STRING, b STRING, c STRING)" 
+ options,
                 "CREATE TABLE IF NOT EXISTS T2 (a STRING, b STRING, c STRING, 
PRIMARY KEY (a) NOT ENFORCED)"
@@ -53,50 +67,22 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
     }
 
     @Test
-    public void testWithoutPrimaryKeyChangelogFileTrue() throws Exception {
-        changelogFile = true;
-        testSimple("T1");
-    }
-
-    @Test
-    public void testWithoutPrimaryKeyChangelogFileFalse() throws Exception {
-        changelogFile = false;
+    public void testWithoutPrimaryKey() throws Exception {
         testSimple("T1");
     }
 
     @Test
-    public void testWithPrimaryKeyChangelogFileTrue() throws Exception {
-        changelogFile = true;
-        testSimple("T2");
-    }
-
-    @Test
-    public void testWithPrimaryKeyChangelogFileFalse() throws Exception {
-        changelogFile = false;
+    public void testWithPrimaryKey() throws Exception {
         testSimple("T2");
     }
 
     @Test
-    public void testProjectionWithoutPrimaryKeyChangelogFileTrue() throws 
Exception {
-        changelogFile = true;
+    public void testProjectionWithoutPrimaryKey() throws Exception {
         testProjection("T1");
     }
 
     @Test
-    public void testProjectionWithoutPrimaryKeyChangelogFileFalse() throws 
Exception {
-        changelogFile = false;
-        testProjection("T1");
-    }
-
-    @Test
-    public void testProjectionWithPrimaryKeyChangelogFileTrue() throws 
Exception {
-        changelogFile = true;
-        testProjection("T2");
-    }
-
-    @Test
-    public void testProjectionWithPrimaryKeyChangelogFileFalse() throws 
Exception {
-        changelogFile = false;
+    public void testProjectionWithPrimaryKey() throws Exception {
         testProjection("T2");
     }
 
@@ -127,23 +113,7 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
     }
 
     @Test
-    public void testContinuousLatestChangelogFileTrue() throws Exception {
-        changelogFile = true;
-        batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')");
-
-        BlockingIterator<Row, Row> iterator =
-                BlockingIterator.of(
-                        streamSqlIter("SELECT * FROM T1 /*+ 
OPTIONS('log.scan'='latest') */"));
-
-        batchSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')");
-        assertThat(iterator.collect(2))
-                .containsExactlyInAnyOrder(Row.of("7", "8", "9"), Row.of("10", 
"11", "12"));
-        iterator.close();
-    }
-
-    @Test
-    public void testContinuousLatestChangelogFileFalse() throws Exception {
-        changelogFile = false;
+    public void testContinuousLatest() throws Exception {
         batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')");
 
         BlockingIterator<Row, Row> iterator =
@@ -157,70 +127,7 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
     }
 
     @Test
-    public void testContinuousFromTimestampChangelogFileTrue() throws 
Exception {
-        changelogFile = true;
-        String sql =
-                "SELECT * FROM T1 /*+ OPTIONS('log.scan'='from-timestamp', 
'log.scan.timestamp-millis'='%s') */";
-
-        // empty table
-        BlockingIterator<Row, Row> iterator = 
BlockingIterator.of(streamSqlIter(sql, 0));
-        batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')");
-        batchSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')");
-        assertThat(iterator.collect(2))
-                .containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4", 
"5", "6"));
-        iterator.close();
-
-        SnapshotManager snapshotManager =
-                new SnapshotManager(LocalFileIO.create(), 
getTableDirectory("T1"));
-        List<Snapshot> snapshots =
-                new 
ArrayList<>(ImmutableList.copyOf(snapshotManager.snapshots()));
-        snapshots.sort(Comparator.comparingLong(Snapshot::timeMillis));
-        Snapshot first = snapshots.get(0);
-        Snapshot second = snapshots.get(1);
-
-        // before second snapshot
-        iterator = BlockingIterator.of(streamSqlIter(sql, second.timeMillis() 
- 1));
-        batchSql("INSERT INTO T1 VALUES ('13', '14', '15')");
-        assertThat(iterator.collect(3))
-                .containsExactlyInAnyOrder(
-                        Row.of("7", "8", "9"), Row.of("10", "11", "12"), 
Row.of("13", "14", "15"));
-        iterator.close();
-
-        // from second snapshot
-        iterator = BlockingIterator.of(streamSqlIter(sql, 
second.timeMillis()));
-        assertThat(iterator.collect(3))
-                .containsExactlyInAnyOrder(
-                        Row.of("7", "8", "9"), Row.of("10", "11", "12"), 
Row.of("13", "14", "15"));
-        iterator.close();
-
-        // from start
-        iterator = BlockingIterator.of(streamSqlIter(sql, first.timeMillis() - 
1));
-        assertThat(iterator.collect(5))
-                .containsExactlyInAnyOrder(
-                        Row.of("1", "2", "3"),
-                        Row.of("4", "5", "6"),
-                        Row.of("7", "8", "9"),
-                        Row.of("10", "11", "12"),
-                        Row.of("13", "14", "15"));
-        iterator.close();
-
-        // from end
-        iterator =
-                BlockingIterator.of(
-                        streamSqlIter(
-                                sql,
-                                snapshotManager
-                                                
.snapshot(snapshotManager.latestSnapshotId())
-                                                .timeMillis()
-                                        + 1));
-        batchSql("INSERT INTO T1 VALUES ('16', '17', '18')");
-        assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of("16", 
"17", "18"));
-        iterator.close();
-    }
-
-    @Test
-    public void testContinuousFromTimestampChangelogFileFalse() throws 
Exception {
-        changelogFile = false;
+    public void testContinuousFromTimestamp() throws Exception {
         String sql =
                 "SELECT * FROM T1 /*+ OPTIONS('log.scan'='from-timestamp', 
'log.scan.timestamp-millis'='%s') */";
 
@@ -281,8 +188,7 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
     }
 
     @Test
-    public void testLackStartupTimestampChangelogFileTrue() {
-        changelogFile = true;
+    public void testLackStartupTimestamp() {
         assertThatThrownBy(
                         () ->
                                 streamSqlIter(
@@ -291,42 +197,7 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
     }
 
     @Test
-    public void testLackStartupTimestampChangelogFileFalse() {
-        changelogFile = false;
-        assertThatThrownBy(
-                        () ->
-                                streamSqlIter(
-                                        "SELECT * FROM T1 /*+ 
OPTIONS('log.scan'='from-timestamp') */"))
-                .hasMessageContaining("Unable to create a source for reading 
table");
-    }
-
-    @Test
-    public void testConfigureStartupTimestampChangelogFileTrue() throws 
Exception {
-        changelogFile = true;
-        // Configure 'log.scan.timestamp-millis' without 'log.scan'.
-        BlockingIterator<Row, Row> iterator =
-                BlockingIterator.of(
-                        streamSqlIter(
-                                "SELECT * FROM T1 /*+ 
OPTIONS('log.scan.timestamp-millis'='%s') */",
-                                0));
-        batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')");
-        batchSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')");
-        assertThat(iterator.collect(2))
-                .containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4", 
"5", "6"));
-        iterator.close();
-
-        // Configure 'log.scan.timestamp-millis' with 'log.scan=latest'.
-        assertThatThrownBy(
-                        () ->
-                                streamSqlIter(
-                                        "SELECT * FROM T1 /*+ 
OPTIONS('log.scan'='latest', 'log.scan.timestamp-millis'='%s') */",
-                                        0))
-                .hasMessageContaining("Unable to create a source for reading 
table");
-    }
-
-    @Test
-    public void testConfigureStartupTimestampChangelogFileFalse() throws 
Exception {
-        changelogFile = false;
+    public void testConfigureStartupTimestamp() throws Exception {
         // Configure 'log.scan.timestamp-millis' without 'log.scan'.
         BlockingIterator<Row, Row> iterator =
                 BlockingIterator.of(
@@ -349,40 +220,7 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
     }
 
     @Test
-    public void testConfigureStartupSnapshotChangelogFileTrue() throws 
Exception {
-        changelogFile = true;
-        // Configure 'scan.snapshot-id' without 'scan.mode'.
-        BlockingIterator<Row, Row> iterator =
-                BlockingIterator.of(
-                        streamSqlIter(
-                                "SELECT * FROM T1 /*+ 
OPTIONS('scan.snapshot-id'='%s') */", 1));
-        batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')");
-        batchSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')");
-        assertThat(iterator.collect(2))
-                .containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4", 
"5", "6"));
-        iterator.close();
-
-        // Start from earliest snapshot
-        iterator =
-                BlockingIterator.of(
-                        streamSqlIter(
-                                "SELECT * FROM T1 /*+ 
OPTIONS('scan.snapshot-id'='%s') */", 0));
-        assertThat(iterator.collect(2))
-                .containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4", 
"5", "6"));
-        iterator.close();
-
-        // Configure 'scan.snapshot-id' with 'scan.mode=latest'.
-        assertThatThrownBy(
-                        () ->
-                                streamSqlIter(
-                                        "SELECT * FROM T1 /*+ 
OPTIONS('scan.mode'='latest', 'scan.snapshot-id'='%s') */",
-                                        0))
-                .hasMessageContaining("Unable to create a source for reading 
table");
-    }
-
-    @Test
-    public void testConfigureStartupSnapshotChangelogFileFalse() throws 
Exception {
-        changelogFile = false;
+    public void testConfigureStartupSnapshot() throws Exception {
         // Configure 'scan.snapshot-id' without 'scan.mode'.
         BlockingIterator<Row, Row> iterator =
                 BlockingIterator.of(
@@ -413,26 +251,7 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
     }
 
     @Test
-    public void testIgnoreOverwriteChangelogTrue() throws Exception {
-        changelogFile = true;
-        BlockingIterator<Row, Row> iterator =
-                BlockingIterator.of(streamSqlIter("SELECT * FROM T1"));
-
-        batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')");
-        assertThat(iterator.collect(2))
-                .containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4", 
"5", "6"));
-
-        // should ignore this overwrite
-        batchSql("INSERT OVERWRITE T1 VALUES ('7', '8', '9')");
-
-        batchSql("INSERT INTO T1 VALUES ('9', '10', '11')");
-        assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of("9", 
"10", "11"));
-        iterator.close();
-    }
-
-    @Test
-    public void testIgnoreOverwriteChangelogFalse() throws Exception {
-        changelogFile = false;
+    public void testIgnoreOverwrite() throws Exception {
         BlockingIterator<Row, Row> iterator =
                 BlockingIterator.of(streamSqlIter("SELECT * FROM T1"));
 
@@ -449,8 +268,7 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
     }
 
     @Test
-    public void testUnsupportedUpsertChangelogFileTrue() {
-        changelogFile = true;
+    public void testUnsupportedUpsert() {
         assertThatThrownBy(
                 () ->
                         streamSqlIter(
@@ -459,28 +277,7 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
     }
 
     @Test
-    public void testUnsupportedUpsertChangelogFileFalse() {
-        changelogFile = false;
-        assertThatThrownBy(
-                () ->
-                        streamSqlIter(
-                                "SELECT * FROM T1 /*+ 
OPTIONS('log.changelog-mode'='upsert') */"),
-                "File store continuous reading does not support upsert 
changelog mode");
-    }
-
-    @Test
-    public void testUnsupportedEventualChangelogFileTrue() {
-        changelogFile = true;
-        assertThatThrownBy(
-                () ->
-                        streamSqlIter(
-                                "SELECT * FROM T1 /*+ 
OPTIONS('log.consistency'='eventual') */"),
-                "File store continuous reading does not support eventual 
consistency mode");
-    }
-
-    @Test
-    public void testUnsupportedEventualChangelogFileFalse() {
-        changelogFile = false;
+    public void testUnsupportedEventual() {
         assertThatThrownBy(
                 () ->
                         streamSqlIter(
diff --git 
a/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
 
b/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
index ed601067f..080bab2e6 100644
--- 
a/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
@@ -27,9 +27,12 @@ import 
org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
 
 import org.apache.flink.types.Row;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Comparator;
 import java.util.List;
 
@@ -37,15 +40,26 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** SQL ITCase for continuous file store. */
+@RunWith(Parameterized.class)
 public class ContinuousFileStoreITCase extends CatalogITCaseBase {
 
     private boolean changelogFile;
 
-    public ContinuousFileStoreITCase() {}
+    public ContinuousFileStoreITCase(boolean changelogFile) {
+        this.changelogFile = changelogFile;
+    }
+
+    @Parameterized.Parameters(name = "changelogFile: {0}")
+    public static Collection<Boolean> parameters() {
+        return Arrays.asList(true, false);
+    }
 
     @Override
     protected List<String> ddl() {
-        String options = changelogFile ? " WITH('changelog-producer'='input')" 
: "";
+        String options =
+                changelogFile
+                        ? " 
WITH('write-mode'='change-log','changelog-producer'='input')"
+                        : "";
         return Arrays.asList(
                 "CREATE TABLE IF NOT EXISTS T1 (a STRING, b STRING, c STRING)" 
+ options,
                 "CREATE TABLE IF NOT EXISTS T2 (a STRING, b STRING, c STRING, 
PRIMARY KEY (a) NOT ENFORCED)"
@@ -53,50 +67,22 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
     }
 
     @Test
-    public void testWithoutPrimaryKeyChangelogFileTrue() throws Exception {
-        changelogFile = true;
-        testSimple("T1");
-    }
-
-    @Test
-    public void testWithoutPrimaryKeyChangelogFileFalse() throws Exception {
-        changelogFile = false;
+    public void testWithoutPrimaryKey() throws Exception {
         testSimple("T1");
     }
 
     @Test
-    public void testWithPrimaryKeyChangelogFileTrue() throws Exception {
-        changelogFile = true;
-        testSimple("T2");
-    }
-
-    @Test
-    public void testWithPrimaryKeyChangelogFileFalse() throws Exception {
-        changelogFile = false;
+    public void testWithPrimaryKey() throws Exception {
         testSimple("T2");
     }
 
     @Test
-    public void testProjectionWithoutPrimaryKeyChangelogFileTrue() throws 
Exception {
-        changelogFile = true;
+    public void testProjectionWithoutPrimaryKey() throws Exception {
         testProjection("T1");
     }
 
     @Test
-    public void testProjectionWithoutPrimaryKeyChangelogFileFalse() throws 
Exception {
-        changelogFile = false;
-        testProjection("T1");
-    }
-
-    @Test
-    public void testProjectionWithPrimaryKeyChangelogFileTrue() throws 
Exception {
-        changelogFile = true;
-        testProjection("T2");
-    }
-
-    @Test
-    public void testProjectionWithPrimaryKeyChangelogFileFalse() throws 
Exception {
-        changelogFile = false;
+    public void testProjectionWithPrimaryKey() throws Exception {
         testProjection("T2");
     }
 
@@ -127,23 +113,7 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
     }
 
     @Test
-    public void testContinuousLatestChangelogFileTrue() throws Exception {
-        changelogFile = true;
-        batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')");
-
-        BlockingIterator<Row, Row> iterator =
-                BlockingIterator.of(
-                        streamSqlIter("SELECT * FROM T1 /*+ 
OPTIONS('log.scan'='latest') */"));
-
-        batchSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')");
-        assertThat(iterator.collect(2))
-                .containsExactlyInAnyOrder(Row.of("7", "8", "9"), Row.of("10", 
"11", "12"));
-        iterator.close();
-    }
-
-    @Test
-    public void testContinuousLatestChangelogFileFalse() throws Exception {
-        changelogFile = false;
+    public void testContinuousLatest() throws Exception {
         batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')");
 
         BlockingIterator<Row, Row> iterator =
@@ -157,70 +127,7 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
     }
 
     @Test
-    public void testContinuousFromTimestampChangelogFileTrue() throws 
Exception {
-        changelogFile = true;
-        String sql =
-                "SELECT * FROM T1 /*+ OPTIONS('log.scan'='from-timestamp', 
'log.scan.timestamp-millis'='%s') */";
-
-        // empty table
-        BlockingIterator<Row, Row> iterator = 
BlockingIterator.of(streamSqlIter(sql, 0));
-        batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')");
-        batchSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')");
-        assertThat(iterator.collect(2))
-                .containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4", 
"5", "6"));
-        iterator.close();
-
-        SnapshotManager snapshotManager =
-                new SnapshotManager(LocalFileIO.create(), 
getTableDirectory("T1"));
-        List<Snapshot> snapshots =
-                new 
ArrayList<>(ImmutableList.copyOf(snapshotManager.snapshots()));
-        snapshots.sort(Comparator.comparingLong(Snapshot::timeMillis));
-        Snapshot first = snapshots.get(0);
-        Snapshot second = snapshots.get(1);
-
-        // before second snapshot
-        iterator = BlockingIterator.of(streamSqlIter(sql, second.timeMillis() 
- 1));
-        batchSql("INSERT INTO T1 VALUES ('13', '14', '15')");
-        assertThat(iterator.collect(3))
-                .containsExactlyInAnyOrder(
-                        Row.of("7", "8", "9"), Row.of("10", "11", "12"), 
Row.of("13", "14", "15"));
-        iterator.close();
-
-        // from second snapshot
-        iterator = BlockingIterator.of(streamSqlIter(sql, 
second.timeMillis()));
-        assertThat(iterator.collect(3))
-                .containsExactlyInAnyOrder(
-                        Row.of("7", "8", "9"), Row.of("10", "11", "12"), 
Row.of("13", "14", "15"));
-        iterator.close();
-
-        // from start
-        iterator = BlockingIterator.of(streamSqlIter(sql, first.timeMillis() - 
1));
-        assertThat(iterator.collect(5))
-                .containsExactlyInAnyOrder(
-                        Row.of("1", "2", "3"),
-                        Row.of("4", "5", "6"),
-                        Row.of("7", "8", "9"),
-                        Row.of("10", "11", "12"),
-                        Row.of("13", "14", "15"));
-        iterator.close();
-
-        // from end
-        iterator =
-                BlockingIterator.of(
-                        streamSqlIter(
-                                sql,
-                                snapshotManager
-                                                
.snapshot(snapshotManager.latestSnapshotId())
-                                                .timeMillis()
-                                        + 1));
-        batchSql("INSERT INTO T1 VALUES ('16', '17', '18')");
-        assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of("16", 
"17", "18"));
-        iterator.close();
-    }
-
-    @Test
-    public void testContinuousFromTimestampChangelogFileFalse() throws 
Exception {
-        changelogFile = false;
+    public void testContinuousFromTimestamp() throws Exception {
         String sql =
                 "SELECT * FROM T1 /*+ OPTIONS('log.scan'='from-timestamp', 
'log.scan.timestamp-millis'='%s') */";
 
@@ -281,8 +188,7 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
     }
 
     @Test
-    public void testLackStartupTimestampChangelogFileTrue() {
-        changelogFile = true;
+    public void testLackStartupTimestamp() {
         assertThatThrownBy(
                         () ->
                                 streamSqlIter(
@@ -291,42 +197,7 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
     }
 
     @Test
-    public void testLackStartupTimestampChangelogFileFalse() {
-        changelogFile = false;
-        assertThatThrownBy(
-                        () ->
-                                streamSqlIter(
-                                        "SELECT * FROM T1 /*+ 
OPTIONS('log.scan'='from-timestamp') */"))
-                .hasMessageContaining("Unable to create a source for reading 
table");
-    }
-
-    @Test
-    public void testConfigureStartupTimestampChangelogFileTrue() throws 
Exception {
-        changelogFile = true;
-        // Configure 'log.scan.timestamp-millis' without 'log.scan'.
-        BlockingIterator<Row, Row> iterator =
-                BlockingIterator.of(
-                        streamSqlIter(
-                                "SELECT * FROM T1 /*+ 
OPTIONS('log.scan.timestamp-millis'='%s') */",
-                                0));
-        batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')");
-        batchSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')");
-        assertThat(iterator.collect(2))
-                .containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4", 
"5", "6"));
-        iterator.close();
-
-        // Configure 'log.scan.timestamp-millis' with 'log.scan=latest'.
-        assertThatThrownBy(
-                        () ->
-                                streamSqlIter(
-                                        "SELECT * FROM T1 /*+ 
OPTIONS('log.scan'='latest', 'log.scan.timestamp-millis'='%s') */",
-                                        0))
-                .hasMessageContaining("Unable to create a source for reading 
table");
-    }
-
-    @Test
-    public void testConfigureStartupTimestampChangelogFileFalse() throws 
Exception {
-        changelogFile = false;
+    public void testConfigureStartupTimestamp() throws Exception {
         // Configure 'log.scan.timestamp-millis' without 'log.scan'.
         BlockingIterator<Row, Row> iterator =
                 BlockingIterator.of(
@@ -349,40 +220,7 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
     }
 
     @Test
-    public void testConfigureStartupSnapshotChangelogFileTrue() throws 
Exception {
-        changelogFile = true;
-        // Configure 'scan.snapshot-id' without 'scan.mode'.
-        BlockingIterator<Row, Row> iterator =
-                BlockingIterator.of(
-                        streamSqlIter(
-                                "SELECT * FROM T1 /*+ 
OPTIONS('scan.snapshot-id'='%s') */", 1));
-        batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')");
-        batchSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')");
-        assertThat(iterator.collect(2))
-                .containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4", 
"5", "6"));
-        iterator.close();
-
-        // Start from earliest snapshot
-        iterator =
-                BlockingIterator.of(
-                        streamSqlIter(
-                                "SELECT * FROM T1 /*+ 
OPTIONS('scan.snapshot-id'='%s') */", 0));
-        assertThat(iterator.collect(2))
-                .containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4", 
"5", "6"));
-        iterator.close();
-
-        // Configure 'scan.snapshot-id' with 'scan.mode=latest'.
-        assertThatThrownBy(
-                        () ->
-                                streamSqlIter(
-                                        "SELECT * FROM T1 /*+ 
OPTIONS('scan.mode'='latest', 'scan.snapshot-id'='%s') */",
-                                        0))
-                .hasMessageContaining("Unable to create a source for reading 
table");
-    }
-
-    @Test
-    public void testConfigureStartupSnapshotChangelogFileFalse() throws 
Exception {
-        changelogFile = false;
+    public void testConfigureStartupSnapshot() throws Exception {
         // Configure 'scan.snapshot-id' without 'scan.mode'.
         BlockingIterator<Row, Row> iterator =
                 BlockingIterator.of(
@@ -413,26 +251,7 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
     }
 
     @Test
-    public void testIgnoreOverwriteChangelogTrue() throws Exception {
-        changelogFile = true;
-        BlockingIterator<Row, Row> iterator =
-                BlockingIterator.of(streamSqlIter("SELECT * FROM T1"));
-
-        batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')");
-        assertThat(iterator.collect(2))
-                .containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4", 
"5", "6"));
-
-        // should ignore this overwrite
-        batchSql("INSERT OVERWRITE T1 VALUES ('7', '8', '9')");
-
-        batchSql("INSERT INTO T1 VALUES ('9', '10', '11')");
-        assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of("9", 
"10", "11"));
-        iterator.close();
-    }
-
-    @Test
-    public void testIgnoreOverwriteChangelogFalse() throws Exception {
-        changelogFile = false;
+    public void testIgnoreOverwrite() throws Exception {
         BlockingIterator<Row, Row> iterator =
                 BlockingIterator.of(streamSqlIter("SELECT * FROM T1"));
 
@@ -449,8 +268,7 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
     }
 
     @Test
-    public void testUnsupportedUpsertChangelogFileTrue() {
-        changelogFile = true;
+    public void testUnsupportedUpsert() {
         assertThatThrownBy(
                 () ->
                         streamSqlIter(
@@ -459,28 +277,7 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
     }
 
     @Test
-    public void testUnsupportedUpsertChangelogFileFalse() {
-        changelogFile = false;
-        assertThatThrownBy(
-                () ->
-                        streamSqlIter(
-                                "SELECT * FROM T1 /*+ 
OPTIONS('log.changelog-mode'='upsert') */"),
-                "File store continuous reading does not support upsert 
changelog mode");
-    }
-
-    @Test
-    public void testUnsupportedEventualChangelogFileTrue() {
-        changelogFile = true;
-        assertThatThrownBy(
-                () ->
-                        streamSqlIter(
-                                "SELECT * FROM T1 /*+ 
OPTIONS('log.consistency'='eventual') */"),
-                "File store continuous reading does not support eventual 
consistency mode");
-    }
-
-    @Test
-    public void testUnsupportedEventualChangelogFileFalse() {
-        changelogFile = false;
+    public void testUnsupportedEventual() {
         assertThatThrownBy(
                 () ->
                         streamSqlIter(

Reply via email to