This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new b48861f2f [e2e] Add e2e for flink 1.18 (#2214)
b48861f2f is described below

commit b48861f2f5538453c1ce1c9f3a1f66e227f89514
Author: yuzelin <[email protected]>
AuthorDate: Thu Nov 2 13:53:52 2023 +0800

    [e2e] Add e2e for flink 1.18 (#2214)
---
 .github/workflows/e2e-tests-1.17-jdk11.yml         |   4 +-
 .github/workflows/e2e-tests-1.17.yml               |   4 +-
 ...sts-1.17-jdk11.yml => e2e-tests-1.18-jdk11.yml} |   4 +-
 .../{e2e-tests-1.17.yml => e2e-tests-1.18.yml}     |   6 +-
 paimon-e2e-tests/pom.xml                           |  14 ++-
 .../java/org/apache/paimon/tests/E2eTestBase.java  |   6 +
 .../paimon/tests/FlinkProceduresE2eTest.java       | 123 +++++++++++++++++++++
 .../test/resources-filtered/docker-compose.yaml    |   8 +-
 .../apache/paimon/hive/Hive23CatalogITCase.java    |  10 +-
 .../apache/paimon/hive/Hive31CatalogITCase.java    |  10 +-
 pom.xml                                            |   5 +-
 11 files changed, 168 insertions(+), 26 deletions(-)

diff --git a/.github/workflows/e2e-tests-1.17-jdk11.yml 
b/.github/workflows/e2e-tests-1.17-jdk11.yml
index 805a95c69..912bd0cbd 100644
--- a/.github/workflows/e2e-tests-1.17-jdk11.yml
+++ b/.github/workflows/e2e-tests-1.17-jdk11.yml
@@ -45,7 +45,7 @@ jobs:
           java-version: ${{ env.JDK_VERSION }}
           distribution: 'adopt'
       - name: Build Flink 1.17
-        run: mvn -T 1C -B clean install -DskipTests
+        run: mvn -T 1C -B clean install -DskipTests -Pflink-1.17
       - name: Test Flink 1.17
         timeout-minutes: 60
         run: |
@@ -53,6 +53,6 @@ jobs:
           . .github/workflows/utils.sh
           jvm_timezone=$(random_timezone)
           echo "JVM timezone is set to $jvm_timezone"
-          mvn -T 1C -B test -pl paimon-e2e-tests -Duser.timezone=$jvm_timezone
+          mvn -T 1C -B test -pl paimon-e2e-tests -Duser.timezone=$jvm_timezone 
-Pflink-1.17
         env:
           MAVEN_OPTS: -Xmx4096m
\ No newline at end of file
diff --git a/.github/workflows/e2e-tests-1.17.yml 
b/.github/workflows/e2e-tests-1.17.yml
index 5fff734e2..12d4836b1 100644
--- a/.github/workflows/e2e-tests-1.17.yml
+++ b/.github/workflows/e2e-tests-1.17.yml
@@ -44,7 +44,7 @@ jobs:
           java-version: ${{ env.JDK_VERSION }}
           distribution: 'adopt'
       - name: Build Flink 1.17
-        run: mvn -T 1C -B clean install -DskipTests
+        run: mvn -T 1C -B clean install -DskipTests -Pflink-1.17
       - name: Test Flink 1.17
         timeout-minutes: 60
         run: |
@@ -52,6 +52,6 @@ jobs:
           . .github/workflows/utils.sh
           jvm_timezone=$(random_timezone)
           echo "JVM timezone is set to $jvm_timezone"
-          mvn -T 1C -B test -pl paimon-e2e-tests -Duser.timezone=$jvm_timezone
+          mvn -T 1C -B test -pl paimon-e2e-tests -Duser.timezone=$jvm_timezone 
-Pflink-1.17
         env:
           MAVEN_OPTS: -Xmx4096m
\ No newline at end of file
diff --git a/.github/workflows/e2e-tests-1.17-jdk11.yml 
b/.github/workflows/e2e-tests-1.18-jdk11.yml
similarity index 94%
copy from .github/workflows/e2e-tests-1.17-jdk11.yml
copy to .github/workflows/e2e-tests-1.18-jdk11.yml
index 805a95c69..a8b42a6df 100644
--- a/.github/workflows/e2e-tests-1.17-jdk11.yml
+++ b/.github/workflows/e2e-tests-1.18-jdk11.yml
@@ -16,7 +16,7 @@
 # limitations under the License.
 
################################################################################
 
-name: End to End Tests Flink 1.17 on JDK 11
+name: End to End Tests Flink 1.18 on JDK 11
 
 on:
   issue_comment:
@@ -44,7 +44,7 @@ jobs:
         with:
           java-version: ${{ env.JDK_VERSION }}
           distribution: 'adopt'
-      - name: Build Flink 1.17
+      - name: Build Flink 1.18
         run: mvn -T 1C -B clean install -DskipTests
       - name: Test Flink 1.17
         timeout-minutes: 60
diff --git a/.github/workflows/e2e-tests-1.17.yml 
b/.github/workflows/e2e-tests-1.18.yml
similarity index 93%
copy from .github/workflows/e2e-tests-1.17.yml
copy to .github/workflows/e2e-tests-1.18.yml
index 5fff734e2..2985b45c4 100644
--- a/.github/workflows/e2e-tests-1.17.yml
+++ b/.github/workflows/e2e-tests-1.18.yml
@@ -16,7 +16,7 @@
 # limitations under the License.
 
################################################################################
 
-name: End to End Tests Flink 1.17
+name: End to End Tests Flink 1.18
 
 on:
   push:
@@ -43,9 +43,9 @@ jobs:
         with:
           java-version: ${{ env.JDK_VERSION }}
           distribution: 'adopt'
-      - name: Build Flink 1.17
+      - name: Build Flink 1.18
         run: mvn -T 1C -B clean install -DskipTests
-      - name: Test Flink 1.17
+      - name: Test Flink 1.18
         timeout-minutes: 60
         run: |
           # run tests with random timezone to find out timezone related bugs
diff --git a/paimon-e2e-tests/pom.xml b/paimon-e2e-tests/pom.xml
index 0bfdc9dd3..e8eed3588 100644
--- a/paimon-e2e-tests/pom.xml
+++ b/paimon-e2e-tests/pom.xml
@@ -211,7 +211,7 @@ under the License.
                         <artifactItem>
                             <groupId>org.apache.flink</groupId>
                             
<artifactId>${flink.sql.connector.kafka}</artifactId>
-                            <version>${test.flink.version}</version>
+                            
<version>${test.flink.connector.kafka.version}</version>
                             
<destFileName>flink-sql-connector-kafka.jar</destFileName>
                             <type>jar</type>
                             <overWrite>true</overWrite>
@@ -276,11 +276,21 @@ under the License.
 
     <profiles>
         <!-- Activate these profiles with -Pflink-x.xx to build and test 
against different Flink versions -->
+        <profile>
+            <id>flink-1.17</id>
+            <properties>
+                <test.flink.main.version>1.17</test.flink.main.version>
+                <test.flink.version>1.17.1</test.flink.version>
+                
<test.flink.connector.kafka.version>${test.flink.version}</test.flink.connector.kafka.version>
+            </properties>
+        </profile>
+
         <profile>
             <id>flink-1.16</id>
             <properties>
                 <test.flink.main.version>1.16</test.flink.main.version>
                 <test.flink.version>1.16.2</test.flink.version>
+                
<test.flink.connector.kafka.version>${test.flink.version}</test.flink.connector.kafka.version>
             </properties>
         </profile>
 
@@ -289,6 +299,7 @@ under the License.
             <properties>
                 <test.flink.main.version>1.15</test.flink.main.version>
                 <test.flink.version>1.15.3</test.flink.version>
+                
<test.flink.connector.kafka.version>${test.flink.version}</test.flink.connector.kafka.version>
                 
<flink.sql.connector.kafka>flink-sql-connector-kafka</flink.sql.connector.kafka>
                 
<flink.sql.connector.hive>flink-sql-connector-hive-2.3.6_${scala.binary.version}</flink.sql.connector.hive>
             </properties>
@@ -299,6 +310,7 @@ under the License.
             <properties>
                 <test.flink.main.version>1.14</test.flink.main.version>
                 <test.flink.version>1.14.6</test.flink.version>
+                
<test.flink.connector.kafka.version>${test.flink.version}</test.flink.connector.kafka.version>
                 
<flink.sql.connector.kafka>flink-sql-connector-kafka_${scala.binary.version}</flink.sql.connector.kafka>
                 
<flink.sql.connector.hive>flink-sql-connector-hive-2.3.6_${scala.binary.version}</flink.sql.connector.hive>
             </properties>
diff --git 
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/E2eTestBase.java 
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/E2eTestBase.java
index 576aeeedf..70eac0d35 100644
--- a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/E2eTestBase.java
+++ b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/E2eTestBase.java
@@ -141,7 +141,13 @@ public abstract class E2eTestBase {
         }
         environment.withServices(services.toArray(new 
String[0])).withLocalCompose(true);
 
+        environment.waitingFor(
+                "jobmanager_1", Wait.forLogMessage(".*Registering 
TaskManager.*", 1));
+        environment.waitingFor(
+                "taskmanager_1",
+                Wait.forLogMessage(".*Successful registration at resource 
manager.*", 1));
         environment.start();
+
         jobManager = 
environment.getContainerByServiceName("jobmanager_1").get();
         jobManager.execInContainer("chown", "-R", "flink:flink", 
TEST_DATA_DIR);
     }
diff --git 
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/FlinkProceduresE2eTest.java
 
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/FlinkProceduresE2eTest.java
new file mode 100644
index 000000000..74d1a588f
--- /dev/null
+++ 
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/FlinkProceduresE2eTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.tests;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.EnabledIf;
+
+import java.util.UUID;
+
+/** Tests for flink {@code Procedure}s. */
+@EnabledIf("runTest")
+public class FlinkProceduresE2eTest extends E2eTestBase {
+
+    private static boolean runTest() {
+        return System.getProperty("test.flink.main.version").compareTo("1.18") 
>= 0;
+    }
+
+    public FlinkProceduresE2eTest() {
+        super(true, true);
+    }
+
+    @Test
+    public void testCompact() throws Exception {
+        String topicName = "ts-topic-" + UUID.randomUUID();
+        createKafkaTopic(topicName, 1);
+        // prepare first part of test data
+        sendKafkaMessage("1.csv", 
"20221205,1,100\n20221206,1,100\n20221207,1,100", topicName);
+
+        // create hive catalog to test catalog loading
+        String warehouse = HDFS_ROOT + "/" + UUID.randomUUID() + ".warehouse";
+        String catalogDdl =
+                String.format(
+                        "CREATE CATALOG ts_catalog WITH (\n"
+                                + "    'type' = 'paimon',\n"
+                                + "    'warehouse' = '%s',\n"
+                                + "    'metastore' = 'hive',\n"
+                                + "    'uri' = 
'thrift://hive-metastore:9083'\n"
+                                + ");",
+                        warehouse);
+        String useCatalogCmd = "USE CATALOG ts_catalog;";
+
+        String testDataSourceDdl =
+                String.format(
+                        "CREATE TEMPORARY TABLE test_source (\n"
+                                + "    dt STRING,\n"
+                                + "    k INT,\n"
+                                + "    v INT"
+                                + ") WITH (\n"
+                                + "    'connector' = 'kafka',\n"
+                                + "    'properties.bootstrap.servers' = 
'kafka:9092',\n"
+                                + "    'properties.group.id' = 'testGroup',\n"
+                                + "    'scan.startup.mode' = 
'earliest-offset',\n"
+                                + "    'topic' = '%s',\n"
+                                + "    'format' = 'csv'\n"
+                                + ");",
+                        topicName);
+
+        String tableDdl =
+                "CREATE TABLE IF NOT EXISTS ts_table (\n"
+                        + "    dt STRING,\n"
+                        + "    k INT,\n"
+                        + "    v INT,\n"
+                        + "    PRIMARY KEY (dt, k) NOT ENFORCED\n"
+                        + ") PARTITIONED BY (dt) WITH (\n"
+                        + "    'changelog-producer' = 'full-compaction',\n"
+                        + "    'changelog-producer.compaction-interval' = 
'1s',\n"
+                        + "    'write-only' = 'true'\n"
+                        + ");";
+
+        // insert data into paimon
+        runSql(
+                "SET 'execution.checkpointing.interval' = '1s';\n"
+                        + "INSERT INTO ts_table SELECT * FROM test_source;",
+                catalogDdl,
+                useCatalogCmd,
+                tableDdl,
+                testDataSourceDdl);
+
+        // execute compact procedure
+        runSql(
+                "SET 'execution.checkpointing.interval' = '1s';\n"
+                        + "CALL sys.compact('default.ts_table', 
'dt=20221205;dt=20221206');",
+                catalogDdl,
+                useCatalogCmd);
+
+        // read all data from paimon
+        runSql(
+                "INSERT INTO result1 SELECT * FROM ts_table;",
+                catalogDdl,
+                useCatalogCmd,
+                tableDdl,
+                createResultSink("result1", "dt STRING, k INT, v INT"));
+
+        // check that first part of test data are compacted
+        checkResult("20221205, 1, 100", "20221206, 1, 100");
+
+        // prepare second part of test data
+        sendKafkaMessage("2.csv", 
"20221205,1,101\n20221206,1,101\n20221207,1,101", topicName);
+
+        // check that second part of test data are compacted
+        checkResult("20221205, 1, 101", "20221206, 1, 101");
+    }
+
+    private void runSql(String sql, String... ddls) throws Exception {
+        runSql(String.join("\n", ddls) + "\n" + sql);
+    }
+}
diff --git a/paimon-e2e-tests/src/test/resources-filtered/docker-compose.yaml 
b/paimon-e2e-tests/src/test/resources-filtered/docker-compose.yaml
index 42a217554..91869fbf8 100644
--- a/paimon-e2e-tests/src/test/resources-filtered/docker-compose.yaml
+++ b/paimon-e2e-tests/src/test/resources-filtered/docker-compose.yaml
@@ -126,7 +126,7 @@ services:
     env_file:
       - ./hadoop-hive.env
     ports:
-      - "50070:50070"
+      - "50070"
 
   datanode:
     image: bde2020/hadoop-datanode:2.0.0-hadoop2.7.4-java8
@@ -142,7 +142,7 @@ services:
     environment:
       SERVICE_PRECONDITION: "namenode:50070"
     ports:
-      - "50075:50075"
+      - "50075"
 
   hive-server:
     image: bde2020/hive:2.3.2-postgresql-metastore
@@ -159,7 +159,7 @@ services:
       HIVE_CORE_CONF_javax_jdo_option_ConnectionURL: 
"jdbc:postgresql://hive-metastore/metastore"
       SERVICE_PRECONDITION: "hive-metastore:9083"
     ports:
-      - "10000:10000"
+      - "10000"
 
   hive-metastore:
     image: bde2020/hive:2.3.2-postgresql-metastore
@@ -175,7 +175,7 @@ services:
     environment:
       SERVICE_PRECONDITION: "namenode:50070 datanode:50075 
hive-metastore-postgresql:5432"
     ports:
-      - "9083:9083"
+      - "9083"
 
   hive-metastore-postgresql:
     image: bde2020/hive-metastore-postgresql:2.3.0
diff --git 
a/paimon-hive/paimon-hive-connector-2.3/src/test/java/org/apache/paimon/hive/Hive23CatalogITCase.java
 
b/paimon-hive/paimon-hive-connector-2.3/src/test/java/org/apache/paimon/hive/Hive23CatalogITCase.java
index ee40ae9d8..af1c8f405 100644
--- 
a/paimon-hive/paimon-hive-connector-2.3/src/test/java/org/apache/paimon/hive/Hive23CatalogITCase.java
+++ 
b/paimon-hive/paimon-hive-connector-2.3/src/test/java/org/apache/paimon/hive/Hive23CatalogITCase.java
@@ -21,6 +21,7 @@ package org.apache.paimon.hive;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.hive.runner.PaimonEmbeddedHiveRunner;
 import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.testutils.assertj.AssertionUtils;
 
 import com.klarna.hiverunner.annotations.HiveRunnerSetup;
 import com.klarna.hiverunner.config.HiveRunnerConfig;
@@ -136,11 +137,10 @@ public class Hive23CatalogITCase extends 
HiveCatalogITCaseBase {
         tEnv.executeSql("CREATE TABLE alter_failed_table(a INT, b 
STRING)").await();
 
         assertThatThrownBy(() -> tEnv.executeSql("ALTER TABLE 
alter_failed_table SET ('aa'='bb')"))
-                .isInstanceOf(TableException.class)
-                .hasMessage(
-                        "Could not execute "
-                                + "ALTER TABLE 
my_alter_hive.default.alter_failed_table\n"
-                                + "  SET 'aa' = 'bb'");
+                .satisfies(
+                        AssertionUtils.anyCauseMatches(
+                                TableException.class,
+                                "Could not execute AlterTable in path 
`my_alter_hive`.`default`.`alter_failed_table`"));
 
         assertThat(
                         new SchemaManager(
diff --git 
a/paimon-hive/paimon-hive-connector-3.1/src/test/java/org/apache/paimon/hive/Hive31CatalogITCase.java
 
b/paimon-hive/paimon-hive-connector-3.1/src/test/java/org/apache/paimon/hive/Hive31CatalogITCase.java
index aafad8b63..400036484 100644
--- 
a/paimon-hive/paimon-hive-connector-3.1/src/test/java/org/apache/paimon/hive/Hive31CatalogITCase.java
+++ 
b/paimon-hive/paimon-hive-connector-3.1/src/test/java/org/apache/paimon/hive/Hive31CatalogITCase.java
@@ -21,6 +21,7 @@ package org.apache.paimon.hive;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.hive.runner.PaimonEmbeddedHiveRunner;
 import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.testutils.assertj.AssertionUtils;
 
 import com.klarna.hiverunner.annotations.HiveRunnerSetup;
 import com.klarna.hiverunner.config.HiveRunnerConfig;
@@ -136,11 +137,10 @@ public class Hive31CatalogITCase extends 
HiveCatalogITCaseBase {
         tEnv.executeSql("CREATE TABLE alter_failed_table(a INT, b 
STRING)").await();
 
         assertThatThrownBy(() -> tEnv.executeSql("ALTER TABLE 
alter_failed_table SET ('aa'='bb')"))
-                .isInstanceOf(TableException.class)
-                .hasMessage(
-                        "Could not execute "
-                                + "ALTER TABLE 
my_alter_hive.default.alter_failed_table\n"
-                                + "  SET 'aa' = 'bb'");
+                .satisfies(
+                        AssertionUtils.anyCauseMatches(
+                                TableException.class,
+                                "Could not execute AlterTable in path 
`my_alter_hive`.`default`.`alter_failed_table`"));
 
         assertThat(
                         new SchemaManager(
diff --git a/pom.xml b/pom.xml
index 4b3be678f..a7f7553e4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -116,8 +116,9 @@ under the License.
         <!-- Can be set to any value to reproduce a specific build. -->
         <test.randomization.seed/>
         <test.unit.pattern>**/*Test.*</test.unit.pattern>
-        <test.flink.main.version>1.17</test.flink.main.version>
-        <test.flink.version>1.17.1</test.flink.version>
+        <test.flink.main.version>1.18</test.flink.main.version>
+        <test.flink.version>1.18.0</test.flink.version>
+        
<test.flink.connector.kafka.version>3.0.1-1.18</test.flink.connector.kafka.version>
 
         <janino.version>3.0.11</janino.version>
         <mockito.version>3.4.6</mockito.version>

Reply via email to