This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new b48861f2f [e2e] Add e2e for flink 1.18 (#2214)
b48861f2f is described below
commit b48861f2f5538453c1ce1c9f3a1f66e227f89514
Author: yuzelin <[email protected]>
AuthorDate: Thu Nov 2 13:53:52 2023 +0800
[e2e] Add e2e for flink 1.18 (#2214)
---
.github/workflows/e2e-tests-1.17-jdk11.yml | 4 +-
.github/workflows/e2e-tests-1.17.yml | 4 +-
...sts-1.17-jdk11.yml => e2e-tests-1.18-jdk11.yml} | 4 +-
.../{e2e-tests-1.17.yml => e2e-tests-1.18.yml} | 6 +-
paimon-e2e-tests/pom.xml | 14 ++-
.../java/org/apache/paimon/tests/E2eTestBase.java | 6 +
.../paimon/tests/FlinkProceduresE2eTest.java | 123 +++++++++++++++++++++
.../test/resources-filtered/docker-compose.yaml | 8 +-
.../apache/paimon/hive/Hive23CatalogITCase.java | 10 +-
.../apache/paimon/hive/Hive31CatalogITCase.java | 10 +-
pom.xml | 5 +-
11 files changed, 168 insertions(+), 26 deletions(-)
diff --git a/.github/workflows/e2e-tests-1.17-jdk11.yml
b/.github/workflows/e2e-tests-1.17-jdk11.yml
index 805a95c69..912bd0cbd 100644
--- a/.github/workflows/e2e-tests-1.17-jdk11.yml
+++ b/.github/workflows/e2e-tests-1.17-jdk11.yml
@@ -45,7 +45,7 @@ jobs:
java-version: ${{ env.JDK_VERSION }}
distribution: 'adopt'
- name: Build Flink 1.17
- run: mvn -T 1C -B clean install -DskipTests
+ run: mvn -T 1C -B clean install -DskipTests -Pflink-1.17
- name: Test Flink 1.17
timeout-minutes: 60
run: |
@@ -53,6 +53,6 @@ jobs:
. .github/workflows/utils.sh
jvm_timezone=$(random_timezone)
echo "JVM timezone is set to $jvm_timezone"
- mvn -T 1C -B test -pl paimon-e2e-tests -Duser.timezone=$jvm_timezone
+ mvn -T 1C -B test -pl paimon-e2e-tests -Duser.timezone=$jvm_timezone
-Pflink-1.17
env:
MAVEN_OPTS: -Xmx4096m
\ No newline at end of file
diff --git a/.github/workflows/e2e-tests-1.17.yml
b/.github/workflows/e2e-tests-1.17.yml
index 5fff734e2..12d4836b1 100644
--- a/.github/workflows/e2e-tests-1.17.yml
+++ b/.github/workflows/e2e-tests-1.17.yml
@@ -44,7 +44,7 @@ jobs:
java-version: ${{ env.JDK_VERSION }}
distribution: 'adopt'
- name: Build Flink 1.17
- run: mvn -T 1C -B clean install -DskipTests
+ run: mvn -T 1C -B clean install -DskipTests -Pflink-1.17
- name: Test Flink 1.17
timeout-minutes: 60
run: |
@@ -52,6 +52,6 @@ jobs:
. .github/workflows/utils.sh
jvm_timezone=$(random_timezone)
echo "JVM timezone is set to $jvm_timezone"
- mvn -T 1C -B test -pl paimon-e2e-tests -Duser.timezone=$jvm_timezone
+ mvn -T 1C -B test -pl paimon-e2e-tests -Duser.timezone=$jvm_timezone
-Pflink-1.17
env:
MAVEN_OPTS: -Xmx4096m
\ No newline at end of file
diff --git a/.github/workflows/e2e-tests-1.17-jdk11.yml
b/.github/workflows/e2e-tests-1.18-jdk11.yml
similarity index 94%
copy from .github/workflows/e2e-tests-1.17-jdk11.yml
copy to .github/workflows/e2e-tests-1.18-jdk11.yml
index 805a95c69..a8b42a6df 100644
--- a/.github/workflows/e2e-tests-1.17-jdk11.yml
+++ b/.github/workflows/e2e-tests-1.18-jdk11.yml
@@ -16,7 +16,7 @@
# limitations under the License.
################################################################################
-name: End to End Tests Flink 1.17 on JDK 11
+name: End to End Tests Flink 1.18 on JDK 11
on:
issue_comment:
@@ -44,7 +44,7 @@ jobs:
with:
java-version: ${{ env.JDK_VERSION }}
distribution: 'adopt'
- - name: Build Flink 1.17
+ - name: Build Flink 1.18
run: mvn -T 1C -B clean install -DskipTests
- name: Test Flink 1.17
timeout-minutes: 60
diff --git a/.github/workflows/e2e-tests-1.17.yml
b/.github/workflows/e2e-tests-1.18.yml
similarity index 93%
copy from .github/workflows/e2e-tests-1.17.yml
copy to .github/workflows/e2e-tests-1.18.yml
index 5fff734e2..2985b45c4 100644
--- a/.github/workflows/e2e-tests-1.17.yml
+++ b/.github/workflows/e2e-tests-1.18.yml
@@ -16,7 +16,7 @@
# limitations under the License.
################################################################################
-name: End to End Tests Flink 1.17
+name: End to End Tests Flink 1.18
on:
push:
@@ -43,9 +43,9 @@ jobs:
with:
java-version: ${{ env.JDK_VERSION }}
distribution: 'adopt'
- - name: Build Flink 1.17
+ - name: Build Flink 1.18
run: mvn -T 1C -B clean install -DskipTests
- - name: Test Flink 1.17
+ - name: Test Flink 1.18
timeout-minutes: 60
run: |
# run tests with random timezone to find out timezone related bugs
diff --git a/paimon-e2e-tests/pom.xml b/paimon-e2e-tests/pom.xml
index 0bfdc9dd3..e8eed3588 100644
--- a/paimon-e2e-tests/pom.xml
+++ b/paimon-e2e-tests/pom.xml
@@ -211,7 +211,7 @@ under the License.
<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>${flink.sql.connector.kafka}</artifactId>
- <version>${test.flink.version}</version>
+
<version>${test.flink.connector.kafka.version}</version>
<destFileName>flink-sql-connector-kafka.jar</destFileName>
<type>jar</type>
<overWrite>true</overWrite>
@@ -276,11 +276,21 @@ under the License.
<profiles>
<!-- Activate these profiles with -Pflink-x.xx to build and test
against different Flink versions -->
+ <profile>
+ <id>flink-1.17</id>
+ <properties>
+ <test.flink.main.version>1.17</test.flink.main.version>
+ <test.flink.version>1.17.1</test.flink.version>
+
<test.flink.connector.kafka.version>${test.flink.version}</test.flink.connector.kafka.version>
+ </properties>
+ </profile>
+
<profile>
<id>flink-1.16</id>
<properties>
<test.flink.main.version>1.16</test.flink.main.version>
<test.flink.version>1.16.2</test.flink.version>
+
<test.flink.connector.kafka.version>${test.flink.version}</test.flink.connector.kafka.version>
</properties>
</profile>
@@ -289,6 +299,7 @@ under the License.
<properties>
<test.flink.main.version>1.15</test.flink.main.version>
<test.flink.version>1.15.3</test.flink.version>
+
<test.flink.connector.kafka.version>${test.flink.version}</test.flink.connector.kafka.version>
<flink.sql.connector.kafka>flink-sql-connector-kafka</flink.sql.connector.kafka>
<flink.sql.connector.hive>flink-sql-connector-hive-2.3.6_${scala.binary.version}</flink.sql.connector.hive>
</properties>
@@ -299,6 +310,7 @@ under the License.
<properties>
<test.flink.main.version>1.14</test.flink.main.version>
<test.flink.version>1.14.6</test.flink.version>
+
<test.flink.connector.kafka.version>${test.flink.version}</test.flink.connector.kafka.version>
<flink.sql.connector.kafka>flink-sql-connector-kafka_${scala.binary.version}</flink.sql.connector.kafka>
<flink.sql.connector.hive>flink-sql-connector-hive-2.3.6_${scala.binary.version}</flink.sql.connector.hive>
</properties>
diff --git
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/E2eTestBase.java
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/E2eTestBase.java
index 576aeeedf..70eac0d35 100644
--- a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/E2eTestBase.java
+++ b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/E2eTestBase.java
@@ -141,7 +141,13 @@ public abstract class E2eTestBase {
}
environment.withServices(services.toArray(new
String[0])).withLocalCompose(true);
+ environment.waitingFor(
+ "jobmanager_1", Wait.forLogMessage(".*Registering
TaskManager.*", 1));
+ environment.waitingFor(
+ "taskmanager_1",
+ Wait.forLogMessage(".*Successful registration at resource
manager.*", 1));
environment.start();
+
jobManager =
environment.getContainerByServiceName("jobmanager_1").get();
jobManager.execInContainer("chown", "-R", "flink:flink",
TEST_DATA_DIR);
}
diff --git
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/FlinkProceduresE2eTest.java
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/FlinkProceduresE2eTest.java
new file mode 100644
index 000000000..74d1a588f
--- /dev/null
+++
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/FlinkProceduresE2eTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.tests;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.EnabledIf;
+
+import java.util.UUID;
+
+/** Tests for flink {@code Procedure}s. */
+@EnabledIf("runTest")
+public class FlinkProceduresE2eTest extends E2eTestBase {
+
+ private static boolean runTest() {
+ return System.getProperty("test.flink.main.version").compareTo("1.18")
>= 0;
+ }
+
+ public FlinkProceduresE2eTest() {
+ super(true, true);
+ }
+
+ @Test
+ public void testCompact() throws Exception {
+ String topicName = "ts-topic-" + UUID.randomUUID();
+ createKafkaTopic(topicName, 1);
+ // prepare first part of test data
+ sendKafkaMessage("1.csv",
"20221205,1,100\n20221206,1,100\n20221207,1,100", topicName);
+
+ // create hive catalog to test catalog loading
+ String warehouse = HDFS_ROOT + "/" + UUID.randomUUID() + ".warehouse";
+ String catalogDdl =
+ String.format(
+ "CREATE CATALOG ts_catalog WITH (\n"
+ + " 'type' = 'paimon',\n"
+ + " 'warehouse' = '%s',\n"
+ + " 'metastore' = 'hive',\n"
+ + " 'uri' =
'thrift://hive-metastore:9083'\n"
+ + ");",
+ warehouse);
+ String useCatalogCmd = "USE CATALOG ts_catalog;";
+
+ String testDataSourceDdl =
+ String.format(
+ "CREATE TEMPORARY TABLE test_source (\n"
+ + " dt STRING,\n"
+ + " k INT,\n"
+ + " v INT"
+ + ") WITH (\n"
+ + " 'connector' = 'kafka',\n"
+ + " 'properties.bootstrap.servers' =
'kafka:9092',\n"
+ + " 'properties.group.id' = 'testGroup',\n"
+ + " 'scan.startup.mode' =
'earliest-offset',\n"
+ + " 'topic' = '%s',\n"
+ + " 'format' = 'csv'\n"
+ + ");",
+ topicName);
+
+ String tableDdl =
+ "CREATE TABLE IF NOT EXISTS ts_table (\n"
+ + " dt STRING,\n"
+ + " k INT,\n"
+ + " v INT,\n"
+ + " PRIMARY KEY (dt, k) NOT ENFORCED\n"
+ + ") PARTITIONED BY (dt) WITH (\n"
+ + " 'changelog-producer' = 'full-compaction',\n"
+ + " 'changelog-producer.compaction-interval' =
'1s',\n"
+ + " 'write-only' = 'true'\n"
+ + ");";
+
+ // insert data into paimon
+ runSql(
+ "SET 'execution.checkpointing.interval' = '1s';\n"
+ + "INSERT INTO ts_table SELECT * FROM test_source;",
+ catalogDdl,
+ useCatalogCmd,
+ tableDdl,
+ testDataSourceDdl);
+
+ // execute compact procedure
+ runSql(
+ "SET 'execution.checkpointing.interval' = '1s';\n"
+ + "CALL sys.compact('default.ts_table',
'dt=20221205;dt=20221206');",
+ catalogDdl,
+ useCatalogCmd);
+
+ // read all data from paimon
+ runSql(
+ "INSERT INTO result1 SELECT * FROM ts_table;",
+ catalogDdl,
+ useCatalogCmd,
+ tableDdl,
+ createResultSink("result1", "dt STRING, k INT, v INT"));
+
+ // check that first part of test data are compacted
+ checkResult("20221205, 1, 100", "20221206, 1, 100");
+
+ // prepare second part of test data
+ sendKafkaMessage("2.csv",
"20221205,1,101\n20221206,1,101\n20221207,1,101", topicName);
+
+ // check that second part of test data are compacted
+ checkResult("20221205, 1, 101", "20221206, 1, 101");
+ }
+
+ private void runSql(String sql, String... ddls) throws Exception {
+ runSql(String.join("\n", ddls) + "\n" + sql);
+ }
+}
diff --git a/paimon-e2e-tests/src/test/resources-filtered/docker-compose.yaml
b/paimon-e2e-tests/src/test/resources-filtered/docker-compose.yaml
index 42a217554..91869fbf8 100644
--- a/paimon-e2e-tests/src/test/resources-filtered/docker-compose.yaml
+++ b/paimon-e2e-tests/src/test/resources-filtered/docker-compose.yaml
@@ -126,7 +126,7 @@ services:
env_file:
- ./hadoop-hive.env
ports:
- - "50070:50070"
+ - "50070"
datanode:
image: bde2020/hadoop-datanode:2.0.0-hadoop2.7.4-java8
@@ -142,7 +142,7 @@ services:
environment:
SERVICE_PRECONDITION: "namenode:50070"
ports:
- - "50075:50075"
+ - "50075"
hive-server:
image: bde2020/hive:2.3.2-postgresql-metastore
@@ -159,7 +159,7 @@ services:
HIVE_CORE_CONF_javax_jdo_option_ConnectionURL:
"jdbc:postgresql://hive-metastore/metastore"
SERVICE_PRECONDITION: "hive-metastore:9083"
ports:
- - "10000:10000"
+ - "10000"
hive-metastore:
image: bde2020/hive:2.3.2-postgresql-metastore
@@ -175,7 +175,7 @@ services:
environment:
SERVICE_PRECONDITION: "namenode:50070 datanode:50075
hive-metastore-postgresql:5432"
ports:
- - "9083:9083"
+ - "9083"
hive-metastore-postgresql:
image: bde2020/hive-metastore-postgresql:2.3.0
diff --git
a/paimon-hive/paimon-hive-connector-2.3/src/test/java/org/apache/paimon/hive/Hive23CatalogITCase.java
b/paimon-hive/paimon-hive-connector-2.3/src/test/java/org/apache/paimon/hive/Hive23CatalogITCase.java
index ee40ae9d8..af1c8f405 100644
---
a/paimon-hive/paimon-hive-connector-2.3/src/test/java/org/apache/paimon/hive/Hive23CatalogITCase.java
+++
b/paimon-hive/paimon-hive-connector-2.3/src/test/java/org/apache/paimon/hive/Hive23CatalogITCase.java
@@ -21,6 +21,7 @@ package org.apache.paimon.hive;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.hive.runner.PaimonEmbeddedHiveRunner;
import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.testutils.assertj.AssertionUtils;
import com.klarna.hiverunner.annotations.HiveRunnerSetup;
import com.klarna.hiverunner.config.HiveRunnerConfig;
@@ -136,11 +137,10 @@ public class Hive23CatalogITCase extends
HiveCatalogITCaseBase {
tEnv.executeSql("CREATE TABLE alter_failed_table(a INT, b
STRING)").await();
assertThatThrownBy(() -> tEnv.executeSql("ALTER TABLE
alter_failed_table SET ('aa'='bb')"))
- .isInstanceOf(TableException.class)
- .hasMessage(
- "Could not execute "
- + "ALTER TABLE
my_alter_hive.default.alter_failed_table\n"
- + " SET 'aa' = 'bb'");
+ .satisfies(
+ AssertionUtils.anyCauseMatches(
+ TableException.class,
+ "Could not execute AlterTable in path
`my_alter_hive`.`default`.`alter_failed_table`"));
assertThat(
new SchemaManager(
diff --git
a/paimon-hive/paimon-hive-connector-3.1/src/test/java/org/apache/paimon/hive/Hive31CatalogITCase.java
b/paimon-hive/paimon-hive-connector-3.1/src/test/java/org/apache/paimon/hive/Hive31CatalogITCase.java
index aafad8b63..400036484 100644
---
a/paimon-hive/paimon-hive-connector-3.1/src/test/java/org/apache/paimon/hive/Hive31CatalogITCase.java
+++
b/paimon-hive/paimon-hive-connector-3.1/src/test/java/org/apache/paimon/hive/Hive31CatalogITCase.java
@@ -21,6 +21,7 @@ package org.apache.paimon.hive;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.hive.runner.PaimonEmbeddedHiveRunner;
import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.testutils.assertj.AssertionUtils;
import com.klarna.hiverunner.annotations.HiveRunnerSetup;
import com.klarna.hiverunner.config.HiveRunnerConfig;
@@ -136,11 +137,10 @@ public class Hive31CatalogITCase extends
HiveCatalogITCaseBase {
tEnv.executeSql("CREATE TABLE alter_failed_table(a INT, b
STRING)").await();
assertThatThrownBy(() -> tEnv.executeSql("ALTER TABLE
alter_failed_table SET ('aa'='bb')"))
- .isInstanceOf(TableException.class)
- .hasMessage(
- "Could not execute "
- + "ALTER TABLE
my_alter_hive.default.alter_failed_table\n"
- + " SET 'aa' = 'bb'");
+ .satisfies(
+ AssertionUtils.anyCauseMatches(
+ TableException.class,
+ "Could not execute AlterTable in path
`my_alter_hive`.`default`.`alter_failed_table`"));
assertThat(
new SchemaManager(
diff --git a/pom.xml b/pom.xml
index 4b3be678f..a7f7553e4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -116,8 +116,9 @@ under the License.
<!-- Can be set to any value to reproduce a specific build. -->
<test.randomization.seed/>
<test.unit.pattern>**/*Test.*</test.unit.pattern>
- <test.flink.main.version>1.17</test.flink.main.version>
- <test.flink.version>1.17.1</test.flink.version>
+ <test.flink.main.version>1.18</test.flink.main.version>
+ <test.flink.version>1.18.0</test.flink.version>
+
<test.flink.connector.kafka.version>3.0.1-1.18</test.flink.connector.kafka.version>
<janino.version>3.0.11</janino.version>
<mockito.version>3.4.6</mockito.version>