This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 22346fcde [flink] Continuous file store IT Cases with parameterized
(#1436)
22346fcde is described below
commit 22346fcde75509dd900788d57a4cc3268dc71b0f
Author: GuojunLi <[email protected]>
AuthorDate: Mon Jun 26 23:18:17 2023 +0800
[flink] Continuous file store IT Cases with parameterized (#1436)
---
.../paimon/flink/ContinuousFileStoreITCase.java | 259 +++------------------
.../paimon/flink/ContinuousFileStoreITCase.java | 259 +++------------------
2 files changed, 56 insertions(+), 462 deletions(-)
diff --git
a/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
b/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
index ed601067f..080bab2e6 100644
---
a/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
+++
b/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
@@ -27,9 +27,12 @@ import
org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
import org.apache.flink.types.Row;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Comparator;
import java.util.List;
@@ -37,15 +40,26 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** SQL ITCase for continuous file store. */
+@RunWith(Parameterized.class)
public class ContinuousFileStoreITCase extends CatalogITCaseBase {
private boolean changelogFile;
- public ContinuousFileStoreITCase() {}
+ public ContinuousFileStoreITCase(boolean changelogFile) {
+ this.changelogFile = changelogFile;
+ }
+
+ @Parameterized.Parameters(name = "changelogFile: {0}")
+ public static Collection<Boolean> parameters() {
+ return Arrays.asList(true, false);
+ }
@Override
protected List<String> ddl() {
- String options = changelogFile ? " WITH('changelog-producer'='input')"
: "";
+ String options =
+ changelogFile
+ ? "
WITH('write-mode'='change-log','changelog-producer'='input')"
+ : "";
return Arrays.asList(
"CREATE TABLE IF NOT EXISTS T1 (a STRING, b STRING, c STRING)"
+ options,
"CREATE TABLE IF NOT EXISTS T2 (a STRING, b STRING, c STRING,
PRIMARY KEY (a) NOT ENFORCED)"
@@ -53,50 +67,22 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
}
@Test
- public void testWithoutPrimaryKeyChangelogFileTrue() throws Exception {
- changelogFile = true;
- testSimple("T1");
- }
-
- @Test
- public void testWithoutPrimaryKeyChangelogFileFalse() throws Exception {
- changelogFile = false;
+ public void testWithoutPrimaryKey() throws Exception {
testSimple("T1");
}
@Test
- public void testWithPrimaryKeyChangelogFileTrue() throws Exception {
- changelogFile = true;
- testSimple("T2");
- }
-
- @Test
- public void testWithPrimaryKeyChangelogFileFalse() throws Exception {
- changelogFile = false;
+ public void testWithPrimaryKey() throws Exception {
testSimple("T2");
}
@Test
- public void testProjectionWithoutPrimaryKeyChangelogFileTrue() throws
Exception {
- changelogFile = true;
+ public void testProjectionWithoutPrimaryKey() throws Exception {
testProjection("T1");
}
@Test
- public void testProjectionWithoutPrimaryKeyChangelogFileFalse() throws
Exception {
- changelogFile = false;
- testProjection("T1");
- }
-
- @Test
- public void testProjectionWithPrimaryKeyChangelogFileTrue() throws
Exception {
- changelogFile = true;
- testProjection("T2");
- }
-
- @Test
- public void testProjectionWithPrimaryKeyChangelogFileFalse() throws
Exception {
- changelogFile = false;
+ public void testProjectionWithPrimaryKey() throws Exception {
testProjection("T2");
}
@@ -127,23 +113,7 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
}
@Test
- public void testContinuousLatestChangelogFileTrue() throws Exception {
- changelogFile = true;
- batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')");
-
- BlockingIterator<Row, Row> iterator =
- BlockingIterator.of(
- streamSqlIter("SELECT * FROM T1 /*+
OPTIONS('log.scan'='latest') */"));
-
- batchSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')");
- assertThat(iterator.collect(2))
- .containsExactlyInAnyOrder(Row.of("7", "8", "9"), Row.of("10",
"11", "12"));
- iterator.close();
- }
-
- @Test
- public void testContinuousLatestChangelogFileFalse() throws Exception {
- changelogFile = false;
+ public void testContinuousLatest() throws Exception {
batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')");
BlockingIterator<Row, Row> iterator =
@@ -157,70 +127,7 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
}
@Test
- public void testContinuousFromTimestampChangelogFileTrue() throws
Exception {
- changelogFile = true;
- String sql =
- "SELECT * FROM T1 /*+ OPTIONS('log.scan'='from-timestamp',
'log.scan.timestamp-millis'='%s') */";
-
- // empty table
- BlockingIterator<Row, Row> iterator =
BlockingIterator.of(streamSqlIter(sql, 0));
- batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')");
- batchSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')");
- assertThat(iterator.collect(2))
- .containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4",
"5", "6"));
- iterator.close();
-
- SnapshotManager snapshotManager =
- new SnapshotManager(LocalFileIO.create(),
getTableDirectory("T1"));
- List<Snapshot> snapshots =
- new
ArrayList<>(ImmutableList.copyOf(snapshotManager.snapshots()));
- snapshots.sort(Comparator.comparingLong(Snapshot::timeMillis));
- Snapshot first = snapshots.get(0);
- Snapshot second = snapshots.get(1);
-
- // before second snapshot
- iterator = BlockingIterator.of(streamSqlIter(sql, second.timeMillis()
- 1));
- batchSql("INSERT INTO T1 VALUES ('13', '14', '15')");
- assertThat(iterator.collect(3))
- .containsExactlyInAnyOrder(
- Row.of("7", "8", "9"), Row.of("10", "11", "12"),
Row.of("13", "14", "15"));
- iterator.close();
-
- // from second snapshot
- iterator = BlockingIterator.of(streamSqlIter(sql,
second.timeMillis()));
- assertThat(iterator.collect(3))
- .containsExactlyInAnyOrder(
- Row.of("7", "8", "9"), Row.of("10", "11", "12"),
Row.of("13", "14", "15"));
- iterator.close();
-
- // from start
- iterator = BlockingIterator.of(streamSqlIter(sql, first.timeMillis() -
1));
- assertThat(iterator.collect(5))
- .containsExactlyInAnyOrder(
- Row.of("1", "2", "3"),
- Row.of("4", "5", "6"),
- Row.of("7", "8", "9"),
- Row.of("10", "11", "12"),
- Row.of("13", "14", "15"));
- iterator.close();
-
- // from end
- iterator =
- BlockingIterator.of(
- streamSqlIter(
- sql,
- snapshotManager
-
.snapshot(snapshotManager.latestSnapshotId())
- .timeMillis()
- + 1));
- batchSql("INSERT INTO T1 VALUES ('16', '17', '18')");
- assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of("16",
"17", "18"));
- iterator.close();
- }
-
- @Test
- public void testContinuousFromTimestampChangelogFileFalse() throws
Exception {
- changelogFile = false;
+ public void testContinuousFromTimestamp() throws Exception {
String sql =
"SELECT * FROM T1 /*+ OPTIONS('log.scan'='from-timestamp',
'log.scan.timestamp-millis'='%s') */";
@@ -281,8 +188,7 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
}
@Test
- public void testLackStartupTimestampChangelogFileTrue() {
- changelogFile = true;
+ public void testLackStartupTimestamp() {
assertThatThrownBy(
() ->
streamSqlIter(
@@ -291,42 +197,7 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
}
@Test
- public void testLackStartupTimestampChangelogFileFalse() {
- changelogFile = false;
- assertThatThrownBy(
- () ->
- streamSqlIter(
- "SELECT * FROM T1 /*+
OPTIONS('log.scan'='from-timestamp') */"))
- .hasMessageContaining("Unable to create a source for reading
table");
- }
-
- @Test
- public void testConfigureStartupTimestampChangelogFileTrue() throws
Exception {
- changelogFile = true;
- // Configure 'log.scan.timestamp-millis' without 'log.scan'.
- BlockingIterator<Row, Row> iterator =
- BlockingIterator.of(
- streamSqlIter(
- "SELECT * FROM T1 /*+
OPTIONS('log.scan.timestamp-millis'='%s') */",
- 0));
- batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')");
- batchSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')");
- assertThat(iterator.collect(2))
- .containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4",
"5", "6"));
- iterator.close();
-
- // Configure 'log.scan.timestamp-millis' with 'log.scan=latest'.
- assertThatThrownBy(
- () ->
- streamSqlIter(
- "SELECT * FROM T1 /*+
OPTIONS('log.scan'='latest', 'log.scan.timestamp-millis'='%s') */",
- 0))
- .hasMessageContaining("Unable to create a source for reading
table");
- }
-
- @Test
- public void testConfigureStartupTimestampChangelogFileFalse() throws
Exception {
- changelogFile = false;
+ public void testConfigureStartupTimestamp() throws Exception {
// Configure 'log.scan.timestamp-millis' without 'log.scan'.
BlockingIterator<Row, Row> iterator =
BlockingIterator.of(
@@ -349,40 +220,7 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
}
@Test
- public void testConfigureStartupSnapshotChangelogFileTrue() throws
Exception {
- changelogFile = true;
- // Configure 'scan.snapshot-id' without 'scan.mode'.
- BlockingIterator<Row, Row> iterator =
- BlockingIterator.of(
- streamSqlIter(
- "SELECT * FROM T1 /*+
OPTIONS('scan.snapshot-id'='%s') */", 1));
- batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')");
- batchSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')");
- assertThat(iterator.collect(2))
- .containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4",
"5", "6"));
- iterator.close();
-
- // Start from earliest snapshot
- iterator =
- BlockingIterator.of(
- streamSqlIter(
- "SELECT * FROM T1 /*+
OPTIONS('scan.snapshot-id'='%s') */", 0));
- assertThat(iterator.collect(2))
- .containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4",
"5", "6"));
- iterator.close();
-
- // Configure 'scan.snapshot-id' with 'scan.mode=latest'.
- assertThatThrownBy(
- () ->
- streamSqlIter(
- "SELECT * FROM T1 /*+
OPTIONS('scan.mode'='latest', 'scan.snapshot-id'='%s') */",
- 0))
- .hasMessageContaining("Unable to create a source for reading
table");
- }
-
- @Test
- public void testConfigureStartupSnapshotChangelogFileFalse() throws
Exception {
- changelogFile = false;
+ public void testConfigureStartupSnapshot() throws Exception {
// Configure 'scan.snapshot-id' without 'scan.mode'.
BlockingIterator<Row, Row> iterator =
BlockingIterator.of(
@@ -413,26 +251,7 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
}
@Test
- public void testIgnoreOverwriteChangelogTrue() throws Exception {
- changelogFile = true;
- BlockingIterator<Row, Row> iterator =
- BlockingIterator.of(streamSqlIter("SELECT * FROM T1"));
-
- batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')");
- assertThat(iterator.collect(2))
- .containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4",
"5", "6"));
-
- // should ignore this overwrite
- batchSql("INSERT OVERWRITE T1 VALUES ('7', '8', '9')");
-
- batchSql("INSERT INTO T1 VALUES ('9', '10', '11')");
- assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of("9",
"10", "11"));
- iterator.close();
- }
-
- @Test
- public void testIgnoreOverwriteChangelogFalse() throws Exception {
- changelogFile = false;
+ public void testIgnoreOverwrite() throws Exception {
BlockingIterator<Row, Row> iterator =
BlockingIterator.of(streamSqlIter("SELECT * FROM T1"));
@@ -449,8 +268,7 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
}
@Test
- public void testUnsupportedUpsertChangelogFileTrue() {
- changelogFile = true;
+ public void testUnsupportedUpsert() {
assertThatThrownBy(
() ->
streamSqlIter(
@@ -459,28 +277,7 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
}
@Test
- public void testUnsupportedUpsertChangelogFileFalse() {
- changelogFile = false;
- assertThatThrownBy(
- () ->
- streamSqlIter(
- "SELECT * FROM T1 /*+
OPTIONS('log.changelog-mode'='upsert') */"),
- "File store continuous reading does not support upsert
changelog mode");
- }
-
- @Test
- public void testUnsupportedEventualChangelogFileTrue() {
- changelogFile = true;
- assertThatThrownBy(
- () ->
- streamSqlIter(
- "SELECT * FROM T1 /*+
OPTIONS('log.consistency'='eventual') */"),
- "File store continuous reading does not support eventual
consistency mode");
- }
-
- @Test
- public void testUnsupportedEventualChangelogFileFalse() {
- changelogFile = false;
+ public void testUnsupportedEventual() {
assertThatThrownBy(
() ->
streamSqlIter(
diff --git
a/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
b/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
index ed601067f..080bab2e6 100644
---
a/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
+++
b/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
@@ -27,9 +27,12 @@ import
org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
import org.apache.flink.types.Row;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Comparator;
import java.util.List;
@@ -37,15 +40,26 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** SQL ITCase for continuous file store. */
+@RunWith(Parameterized.class)
public class ContinuousFileStoreITCase extends CatalogITCaseBase {
private boolean changelogFile;
- public ContinuousFileStoreITCase() {}
+ public ContinuousFileStoreITCase(boolean changelogFile) {
+ this.changelogFile = changelogFile;
+ }
+
+ @Parameterized.Parameters(name = "changelogFile: {0}")
+ public static Collection<Boolean> parameters() {
+ return Arrays.asList(true, false);
+ }
@Override
protected List<String> ddl() {
- String options = changelogFile ? " WITH('changelog-producer'='input')"
: "";
+ String options =
+ changelogFile
+ ? "
WITH('write-mode'='change-log','changelog-producer'='input')"
+ : "";
return Arrays.asList(
"CREATE TABLE IF NOT EXISTS T1 (a STRING, b STRING, c STRING)"
+ options,
"CREATE TABLE IF NOT EXISTS T2 (a STRING, b STRING, c STRING,
PRIMARY KEY (a) NOT ENFORCED)"
@@ -53,50 +67,22 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
}
@Test
- public void testWithoutPrimaryKeyChangelogFileTrue() throws Exception {
- changelogFile = true;
- testSimple("T1");
- }
-
- @Test
- public void testWithoutPrimaryKeyChangelogFileFalse() throws Exception {
- changelogFile = false;
+ public void testWithoutPrimaryKey() throws Exception {
testSimple("T1");
}
@Test
- public void testWithPrimaryKeyChangelogFileTrue() throws Exception {
- changelogFile = true;
- testSimple("T2");
- }
-
- @Test
- public void testWithPrimaryKeyChangelogFileFalse() throws Exception {
- changelogFile = false;
+ public void testWithPrimaryKey() throws Exception {
testSimple("T2");
}
@Test
- public void testProjectionWithoutPrimaryKeyChangelogFileTrue() throws
Exception {
- changelogFile = true;
+ public void testProjectionWithoutPrimaryKey() throws Exception {
testProjection("T1");
}
@Test
- public void testProjectionWithoutPrimaryKeyChangelogFileFalse() throws
Exception {
- changelogFile = false;
- testProjection("T1");
- }
-
- @Test
- public void testProjectionWithPrimaryKeyChangelogFileTrue() throws
Exception {
- changelogFile = true;
- testProjection("T2");
- }
-
- @Test
- public void testProjectionWithPrimaryKeyChangelogFileFalse() throws
Exception {
- changelogFile = false;
+ public void testProjectionWithPrimaryKey() throws Exception {
testProjection("T2");
}
@@ -127,23 +113,7 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
}
@Test
- public void testContinuousLatestChangelogFileTrue() throws Exception {
- changelogFile = true;
- batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')");
-
- BlockingIterator<Row, Row> iterator =
- BlockingIterator.of(
- streamSqlIter("SELECT * FROM T1 /*+
OPTIONS('log.scan'='latest') */"));
-
- batchSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')");
- assertThat(iterator.collect(2))
- .containsExactlyInAnyOrder(Row.of("7", "8", "9"), Row.of("10",
"11", "12"));
- iterator.close();
- }
-
- @Test
- public void testContinuousLatestChangelogFileFalse() throws Exception {
- changelogFile = false;
+ public void testContinuousLatest() throws Exception {
batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')");
BlockingIterator<Row, Row> iterator =
@@ -157,70 +127,7 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
}
@Test
- public void testContinuousFromTimestampChangelogFileTrue() throws
Exception {
- changelogFile = true;
- String sql =
- "SELECT * FROM T1 /*+ OPTIONS('log.scan'='from-timestamp',
'log.scan.timestamp-millis'='%s') */";
-
- // empty table
- BlockingIterator<Row, Row> iterator =
BlockingIterator.of(streamSqlIter(sql, 0));
- batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')");
- batchSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')");
- assertThat(iterator.collect(2))
- .containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4",
"5", "6"));
- iterator.close();
-
- SnapshotManager snapshotManager =
- new SnapshotManager(LocalFileIO.create(),
getTableDirectory("T1"));
- List<Snapshot> snapshots =
- new
ArrayList<>(ImmutableList.copyOf(snapshotManager.snapshots()));
- snapshots.sort(Comparator.comparingLong(Snapshot::timeMillis));
- Snapshot first = snapshots.get(0);
- Snapshot second = snapshots.get(1);
-
- // before second snapshot
- iterator = BlockingIterator.of(streamSqlIter(sql, second.timeMillis()
- 1));
- batchSql("INSERT INTO T1 VALUES ('13', '14', '15')");
- assertThat(iterator.collect(3))
- .containsExactlyInAnyOrder(
- Row.of("7", "8", "9"), Row.of("10", "11", "12"),
Row.of("13", "14", "15"));
- iterator.close();
-
- // from second snapshot
- iterator = BlockingIterator.of(streamSqlIter(sql,
second.timeMillis()));
- assertThat(iterator.collect(3))
- .containsExactlyInAnyOrder(
- Row.of("7", "8", "9"), Row.of("10", "11", "12"),
Row.of("13", "14", "15"));
- iterator.close();
-
- // from start
- iterator = BlockingIterator.of(streamSqlIter(sql, first.timeMillis() -
1));
- assertThat(iterator.collect(5))
- .containsExactlyInAnyOrder(
- Row.of("1", "2", "3"),
- Row.of("4", "5", "6"),
- Row.of("7", "8", "9"),
- Row.of("10", "11", "12"),
- Row.of("13", "14", "15"));
- iterator.close();
-
- // from end
- iterator =
- BlockingIterator.of(
- streamSqlIter(
- sql,
- snapshotManager
-
.snapshot(snapshotManager.latestSnapshotId())
- .timeMillis()
- + 1));
- batchSql("INSERT INTO T1 VALUES ('16', '17', '18')");
- assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of("16",
"17", "18"));
- iterator.close();
- }
-
- @Test
- public void testContinuousFromTimestampChangelogFileFalse() throws
Exception {
- changelogFile = false;
+ public void testContinuousFromTimestamp() throws Exception {
String sql =
"SELECT * FROM T1 /*+ OPTIONS('log.scan'='from-timestamp',
'log.scan.timestamp-millis'='%s') */";
@@ -281,8 +188,7 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
}
@Test
- public void testLackStartupTimestampChangelogFileTrue() {
- changelogFile = true;
+ public void testLackStartupTimestamp() {
assertThatThrownBy(
() ->
streamSqlIter(
@@ -291,42 +197,7 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
}
@Test
- public void testLackStartupTimestampChangelogFileFalse() {
- changelogFile = false;
- assertThatThrownBy(
- () ->
- streamSqlIter(
- "SELECT * FROM T1 /*+
OPTIONS('log.scan'='from-timestamp') */"))
- .hasMessageContaining("Unable to create a source for reading
table");
- }
-
- @Test
- public void testConfigureStartupTimestampChangelogFileTrue() throws
Exception {
- changelogFile = true;
- // Configure 'log.scan.timestamp-millis' without 'log.scan'.
- BlockingIterator<Row, Row> iterator =
- BlockingIterator.of(
- streamSqlIter(
- "SELECT * FROM T1 /*+
OPTIONS('log.scan.timestamp-millis'='%s') */",
- 0));
- batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')");
- batchSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')");
- assertThat(iterator.collect(2))
- .containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4",
"5", "6"));
- iterator.close();
-
- // Configure 'log.scan.timestamp-millis' with 'log.scan=latest'.
- assertThatThrownBy(
- () ->
- streamSqlIter(
- "SELECT * FROM T1 /*+
OPTIONS('log.scan'='latest', 'log.scan.timestamp-millis'='%s') */",
- 0))
- .hasMessageContaining("Unable to create a source for reading
table");
- }
-
- @Test
- public void testConfigureStartupTimestampChangelogFileFalse() throws
Exception {
- changelogFile = false;
+ public void testConfigureStartupTimestamp() throws Exception {
// Configure 'log.scan.timestamp-millis' without 'log.scan'.
BlockingIterator<Row, Row> iterator =
BlockingIterator.of(
@@ -349,40 +220,7 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
}
@Test
- public void testConfigureStartupSnapshotChangelogFileTrue() throws
Exception {
- changelogFile = true;
- // Configure 'scan.snapshot-id' without 'scan.mode'.
- BlockingIterator<Row, Row> iterator =
- BlockingIterator.of(
- streamSqlIter(
- "SELECT * FROM T1 /*+
OPTIONS('scan.snapshot-id'='%s') */", 1));
- batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')");
- batchSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')");
- assertThat(iterator.collect(2))
- .containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4",
"5", "6"));
- iterator.close();
-
- // Start from earliest snapshot
- iterator =
- BlockingIterator.of(
- streamSqlIter(
- "SELECT * FROM T1 /*+
OPTIONS('scan.snapshot-id'='%s') */", 0));
- assertThat(iterator.collect(2))
- .containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4",
"5", "6"));
- iterator.close();
-
- // Configure 'scan.snapshot-id' with 'scan.mode=latest'.
- assertThatThrownBy(
- () ->
- streamSqlIter(
- "SELECT * FROM T1 /*+
OPTIONS('scan.mode'='latest', 'scan.snapshot-id'='%s') */",
- 0))
- .hasMessageContaining("Unable to create a source for reading
table");
- }
-
- @Test
- public void testConfigureStartupSnapshotChangelogFileFalse() throws
Exception {
- changelogFile = false;
+ public void testConfigureStartupSnapshot() throws Exception {
// Configure 'scan.snapshot-id' without 'scan.mode'.
BlockingIterator<Row, Row> iterator =
BlockingIterator.of(
@@ -413,26 +251,7 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
}
@Test
- public void testIgnoreOverwriteChangelogTrue() throws Exception {
- changelogFile = true;
- BlockingIterator<Row, Row> iterator =
- BlockingIterator.of(streamSqlIter("SELECT * FROM T1"));
-
- batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')");
- assertThat(iterator.collect(2))
- .containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4",
"5", "6"));
-
- // should ignore this overwrite
- batchSql("INSERT OVERWRITE T1 VALUES ('7', '8', '9')");
-
- batchSql("INSERT INTO T1 VALUES ('9', '10', '11')");
- assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of("9",
"10", "11"));
- iterator.close();
- }
-
- @Test
- public void testIgnoreOverwriteChangelogFalse() throws Exception {
- changelogFile = false;
+ public void testIgnoreOverwrite() throws Exception {
BlockingIterator<Row, Row> iterator =
BlockingIterator.of(streamSqlIter("SELECT * FROM T1"));
@@ -449,8 +268,7 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
}
@Test
- public void testUnsupportedUpsertChangelogFileTrue() {
- changelogFile = true;
+ public void testUnsupportedUpsert() {
assertThatThrownBy(
() ->
streamSqlIter(
@@ -459,28 +277,7 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
}
@Test
- public void testUnsupportedUpsertChangelogFileFalse() {
- changelogFile = false;
- assertThatThrownBy(
- () ->
- streamSqlIter(
- "SELECT * FROM T1 /*+
OPTIONS('log.changelog-mode'='upsert') */"),
- "File store continuous reading does not support upsert
changelog mode");
- }
-
- @Test
- public void testUnsupportedEventualChangelogFileTrue() {
- changelogFile = true;
- assertThatThrownBy(
- () ->
- streamSqlIter(
- "SELECT * FROM T1 /*+
OPTIONS('log.consistency'='eventual') */"),
- "File store continuous reading does not support eventual
consistency mode");
- }
-
- @Test
- public void testUnsupportedEventualChangelogFileFalse() {
- changelogFile = false;
+ public void testUnsupportedEventual() {
assertThatThrownBy(
() ->
streamSqlIter(