This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new ec5ebeb27 [ci] Fix paimon flink tests to cover submodule versions
(#2949)
ec5ebeb27 is described below
commit ec5ebeb27bd4e9f8a557cf2137eeee4f370ad8b2
Author: yuzelin <[email protected]>
AuthorDate: Wed Mar 6 17:36:50 2024 +0800
[ci] Fix paimon flink tests to cover submodule versions (#2949)
---
.github/workflows/utitcase-flink.yml | 7 ++++-
.../paimon/flink/ContinuousFileStoreITCase.java | 29 ++++++++++++++++----
.../paimon/flink/ContinuousFileStoreITCase.java | 29 ++++++++++++++++----
.../apache/paimon/flink/SchemaChangeITCase.java | 5 ++--
paimon-flink/paimon-flink-common/pom.xml | 29 ++------------------
paimon-flink/pom.xml | 31 ++++++++++++++++++++++
6 files changed, 90 insertions(+), 40 deletions(-)
diff --git a/.github/workflows/utitcase-flink.yml
b/.github/workflows/utitcase-flink.yml
index 3b543b3a2..a5c83366f 100644
--- a/.github/workflows/utitcase-flink.yml
+++ b/.github/workflows/utitcase-flink.yml
@@ -51,6 +51,11 @@ jobs:
. .github/workflows/utils.sh
jvm_timezone=$(random_timezone)
echo "JVM timezone is set to $jvm_timezone"
- mvn -T 1C -B clean install -pl
'org.apache.paimon:paimon-flink-common' -Duser.timezone=$jvm_timezone
+ test_modules=""
+ for suffix in 1.14 1.15 1.16 1.17 1.18 common; do
+ test_modules+="org.apache.paimon:paimon-flink-${suffix},"
+ done
+ test_modules="${test_modules%,}"
+ mvn -T 1C -B clean install -pl "${test_modules}"
-Duser.timezone=$jvm_timezone
env:
MAVEN_OPTS: -Xmx4096m
\ No newline at end of file
diff --git
a/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
b/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
index 21211e65e..0e14aaeeb 100644
---
a/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
+++
b/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
@@ -42,8 +42,9 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
@Override
protected List<String> ddl() {
return Arrays.asList(
- "CREATE TABLE IF NOT EXISTS T1 (a STRING, b STRING, c STRING)",
- "CREATE TABLE IF NOT EXISTS T2 (a STRING, b STRING, c STRING,
PRIMARY KEY (a) NOT ENFORCED) WITH('changelog-producer'='input')");
+ "CREATE TABLE IF NOT EXISTS T1 (a STRING, b STRING, c STRING)
WITH ('bucket' = '1')",
+ "CREATE TABLE IF NOT EXISTS T2 (a STRING, b STRING, c STRING,
PRIMARY KEY (a) NOT ENFORCED)"
+ + " WITH ('changelog-producer'='input', 'bucket' =
'1')");
}
@Test
@@ -202,12 +203,12 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
@Test
public void testConfigureStartupSnapshot() throws Exception {
// Configure 'scan.snapshot-id' without 'scan.mode'.
+ batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')");
+ batchSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')");
BlockingIterator<Row, Row> iterator =
BlockingIterator.of(
streamSqlIter(
"SELECT * FROM T1 /*+
OPTIONS('scan.snapshot-id'='%s') */", 1));
- batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')");
- batchSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')");
assertThat(iterator.collect(2))
.containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4",
"5", "6"));
iterator.close();
@@ -221,13 +222,31 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
.containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4",
"5", "6"));
iterator.close();
+ iterator =
+ BlockingIterator.of(
+ streamSqlIter(
+ "SELECT * FROM T1 /*+
OPTIONS('scan.snapshot-id'='%s') */", 1));
+ assertThat(iterator.collect(2))
+ .containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4",
"5", "6"));
+ iterator.close();
+
+ iterator =
+ BlockingIterator.of(
+ streamSqlIter(
+ "SELECT * FROM T1 /*+
OPTIONS('scan.snapshot-id'='%s') */", 2));
+ assertThat(iterator.collect(2))
+ .containsExactlyInAnyOrder(Row.of("7", "8", "9"),
(Row.of("10", "11", "12")));
+ iterator.close();
+
// Configure 'scan.snapshot-id' with 'scan.mode=latest'.
assertThatThrownBy(
() ->
streamSqlIter(
"SELECT * FROM T1 /*+
OPTIONS('scan.mode'='latest', 'scan.snapshot-id'='%s') */",
0))
- .hasMessageContaining("Unable to create a source for reading
table");
+ .hasCauseInstanceOf(IllegalArgumentException.class)
+ .hasRootCauseMessage(
+ "scan.snapshot-id must be null when you use latest for
scan.mode");
}
@Test
diff --git
a/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
b/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
index 21211e65e..0e14aaeeb 100644
---
a/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
+++
b/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
@@ -42,8 +42,9 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
@Override
protected List<String> ddl() {
return Arrays.asList(
- "CREATE TABLE IF NOT EXISTS T1 (a STRING, b STRING, c STRING)",
- "CREATE TABLE IF NOT EXISTS T2 (a STRING, b STRING, c STRING,
PRIMARY KEY (a) NOT ENFORCED) WITH('changelog-producer'='input')");
+ "CREATE TABLE IF NOT EXISTS T1 (a STRING, b STRING, c STRING)
WITH ('bucket' = '1')",
+ "CREATE TABLE IF NOT EXISTS T2 (a STRING, b STRING, c STRING,
PRIMARY KEY (a) NOT ENFORCED)"
+ + " WITH ('changelog-producer'='input', 'bucket' =
'1')");
}
@Test
@@ -202,12 +203,12 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
@Test
public void testConfigureStartupSnapshot() throws Exception {
// Configure 'scan.snapshot-id' without 'scan.mode'.
+ batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')");
+ batchSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')");
BlockingIterator<Row, Row> iterator =
BlockingIterator.of(
streamSqlIter(
"SELECT * FROM T1 /*+
OPTIONS('scan.snapshot-id'='%s') */", 1));
- batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')");
- batchSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')");
assertThat(iterator.collect(2))
.containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4",
"5", "6"));
iterator.close();
@@ -221,13 +222,31 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
.containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4",
"5", "6"));
iterator.close();
+ iterator =
+ BlockingIterator.of(
+ streamSqlIter(
+ "SELECT * FROM T1 /*+
OPTIONS('scan.snapshot-id'='%s') */", 1));
+ assertThat(iterator.collect(2))
+ .containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4",
"5", "6"));
+ iterator.close();
+
+ iterator =
+ BlockingIterator.of(
+ streamSqlIter(
+ "SELECT * FROM T1 /*+
OPTIONS('scan.snapshot-id'='%s') */", 2));
+ assertThat(iterator.collect(2))
+ .containsExactlyInAnyOrder(Row.of("7", "8", "9"),
(Row.of("10", "11", "12")));
+ iterator.close();
+
// Configure 'scan.snapshot-id' with 'scan.mode=latest'.
assertThatThrownBy(
() ->
streamSqlIter(
"SELECT * FROM T1 /*+
OPTIONS('scan.mode'='latest', 'scan.snapshot-id'='%s') */",
0))
- .hasMessageContaining("Unable to create a source for reading
table");
+ .hasCauseInstanceOf(IllegalArgumentException.class)
+ .hasRootCauseMessage(
+ "scan.snapshot-id must be null when you use latest for
scan.mode");
}
@Test
diff --git
a/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
b/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
index d0553c8c7..663319ec9 100644
---
a/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
+++
b/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
@@ -44,7 +44,7 @@ public class SchemaChangeITCase extends CatalogITCaseBase {
}
@Test
- public void testSetAndResetImmutableOptions() throws Exception {
+ public void testSetAndResetImmutableOptions() {
// bucket-key is immutable
sql("CREATE TABLE T1 (a STRING, b STRING, c STRING)");
@@ -53,7 +53,8 @@ public class SchemaChangeITCase extends CatalogITCaseBase {
.isInstanceOf(UnsupportedOperationException.class)
.hasMessage("Change 'bucket-key' is not supported yet.");
- sql("CREATE TABLE T2 (a STRING, b STRING, c STRING) WITH ('bucket-key'
= 'c')");
+ sql(
+ "CREATE TABLE T2 (a STRING, b STRING, c STRING) WITH ('bucket'
= '1', 'bucket-key' = 'c')");
assertThatThrownBy(() -> sql("ALTER TABLE T2 RESET ('bucket-key')"))
.getRootCause()
.isInstanceOf(UnsupportedOperationException.class)
diff --git a/paimon-flink/paimon-flink-common/pom.xml
b/paimon-flink/paimon-flink-common/pom.xml
index 76b2dc4ff..74c53a85b 100644
--- a/paimon-flink/paimon-flink-common/pom.xml
+++ b/paimon-flink/paimon-flink-common/pom.xml
@@ -113,34 +113,9 @@ under the License.
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <version>${hadoop.version}</version>
- <scope>provided</scope>
- <exclusions>
<exclusion>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro</artifactId>
- </exclusion>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- <exclusion>
- <groupId>jdk.tools</groupId>
- <artifactId>jdk.tools</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.google.protobuf</groupId>
- <artifactId>protobuf-java</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
</exclusion>
</exclusions>
</dependency>
diff --git a/paimon-flink/pom.xml b/paimon-flink/pom.xml
index b88f541c6..df6ee6acb 100644
--- a/paimon-flink/pom.xml
+++ b/paimon-flink/pom.xml
@@ -63,6 +63,37 @@ under the License.
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>jdk.tools</groupId>
+ <artifactId>jdk.tools</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <!-- test dependencies -->
+
<dependency>
<groupId>org.apache.paimon</groupId>
<artifactId>paimon-core</artifactId>