This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-paimon-trino.git


The following commit(s) were added to refs/heads/main by this push:
     new d6ce2ab  Add module for trino 422 (#20)
d6ce2ab is described below

commit d6ce2ab07c99212e367c889726a120ae40362ce5
Author: tsreaper <[email protected]>
AuthorDate: Mon Jul 24 09:51:26 2023 +0800

    Add module for trino 422 (#20)
---
 .github/workflows/ci-jdk17.yml                     |   2 +-
 .github/workflows/publish_snapshot_jdk17.yml       |   2 +-
 paimon-trino-358/pom.xml                           |   9 +
 .../apache/paimon/trino/SimpleTableTestHelper.java |  67 ----
 .../apache/paimon/trino/TestTrino358ITCase.java    |  31 ++
 .../org/apache/paimon/trino/TestTrinoITCase.java   | 384 -------------------
 paimon-trino-368/pom.xml                           |   9 +
 .../apache/paimon/trino/SimpleTableTestHelper.java |  67 ----
 .../apache/paimon/trino/TestTrino368ITCase.java    |  22 ++
 .../org/apache/paimon/trino/TestTrinoITCase.java   | 406 ---------------------
 paimon-trino-369/pom.xml                           |   9 +
 .../apache/paimon/trino/SimpleTableTestHelper.java |  67 ----
 .../apache/paimon/trino/TestTrino369ITCase.java    |  22 ++
 .../org/apache/paimon/trino/TestTrinoITCase.java   | 406 ---------------------
 paimon-trino-370/pom.xml                           |   9 +
 .../apache/paimon/trino/SimpleTableTestHelper.java |  67 ----
 .../apache/paimon/trino/TestTrino370ITCase.java    |  22 ++
 .../org/apache/paimon/trino/TestTrinoITCase.java   | 406 ---------------------
 paimon-trino-388/pom.xml                           |   9 +
 .../apache/paimon/trino/TestTrino388ITCase.java    |  22 ++
 .../src/test/resources/log4j2-test.properties      |  28 ++
 paimon-trino-393/pom.xml                           |   9 +
 .../apache/paimon/trino/SimpleTableTestHelper.java |  67 ----
 .../apache/paimon/trino/TestTrino393ITCase.java    |  22 ++
 .../org/apache/paimon/trino/TestTrinoITCase.java   | 406 ---------------------
 {paimon-trino-393 => paimon-trino-422}/pom.xml     |  15 +-
 .../apache/paimon/trino/TestTrino422ITCase.java    |  22 ++
 .../src/test/resources/log4j2-test.properties      |  28 ++
 paimon-trino-common/pom.xml                        |  12 +
 .../org/apache/paimon/trino/TestTrinoITCase.java   |   2 +-
 pom.xml                                            |   7 +-
 31 files changed, 304 insertions(+), 2352 deletions(-)

diff --git a/.github/workflows/ci-jdk17.yml b/.github/workflows/ci-jdk17.yml
index cc70d1e..803ba0a 100644
--- a/.github/workflows/ci-jdk17.yml
+++ b/.github/workflows/ci-jdk17.yml
@@ -69,4 +69,4 @@ jobs:
           java-version: ${{ matrix.java }}
       - name: Build
         run: |
-          mvn clean install -DskipTests -pl 
paimon-trino-common,paimon-trino-393
+          mvn clean install -DskipTests -pl 
paimon-trino-common,paimon-trino-393,paimon-trino-422
diff --git a/.github/workflows/publish_snapshot_jdk17.yml 
b/.github/workflows/publish_snapshot_jdk17.yml
index 81970ee..3d73996 100644
--- a/.github/workflows/publish_snapshot_jdk17.yml
+++ b/.github/workflows/publish_snapshot_jdk17.yml
@@ -81,6 +81,6 @@ jobs:
           echo "<password>$ASF_PASSWORD</password>" >> $tmp_settings
           echo "</server></servers></settings>" >> $tmp_settings
           
-          mvn --settings $tmp_settings clean deploy -Dgpg.skip -Drat.skip 
-DskipTests -Papache-release -pl paimon-trino-common,paimon-trino-393
+          mvn --settings $tmp_settings clean deploy -Dgpg.skip -Drat.skip 
-DskipTests -Papache-release -pl 
paimon-trino-common,paimon-trino-393,paimon-trino-422
           
           rm $tmp_settings
diff --git a/paimon-trino-358/pom.xml b/paimon-trino-358/pom.xml
index 0366d79..303b9f3 100644
--- a/paimon-trino-358/pom.xml
+++ b/paimon-trino-358/pom.xml
@@ -73,6 +73,15 @@ under the License.
         </dependency>
 
         <!-- for testing -->
+
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>paimon-trino-common</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+            <type>test-jar</type>
+        </dependency>
+
         <dependency>
             <groupId>io.trino</groupId>
             <artifactId>trino-client</artifactId>
diff --git 
a/paimon-trino-358/src/test/java/org/apache/paimon/trino/SimpleTableTestHelper.java
 
b/paimon-trino-358/src/test/java/org/apache/paimon/trino/SimpleTableTestHelper.java
deleted file mode 100644
index 621cfbc..0000000
--- 
a/paimon-trino-358/src/test/java/org/apache/paimon/trino/SimpleTableTestHelper.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.trino;
-
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.fs.Path;
-import org.apache.paimon.fs.local.LocalFileIO;
-import org.apache.paimon.schema.Schema;
-import org.apache.paimon.schema.SchemaManager;
-import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.FileStoreTableFactory;
-import org.apache.paimon.table.sink.InnerTableCommit;
-import org.apache.paimon.table.sink.InnerTableWrite;
-import org.apache.paimon.types.RowType;
-
-import java.util.Collections;
-import java.util.HashMap;
-
-/** A simple table test helper to write and commit. */
-public class SimpleTableTestHelper {
-
-    private final InnerTableWrite writer;
-    private final InnerTableCommit commit;
-
-    public SimpleTableTestHelper(Path path, RowType rowType) throws Exception {
-        new SchemaManager(LocalFileIO.create(), path)
-                .createTable(
-                        new Schema(
-                                rowType.getFields(),
-                                Collections.emptyList(),
-                                Collections.emptyList(),
-                                new HashMap<>() {
-                                    {
-                                        put("write-mode", "change-log");
-                                    }
-                                },
-                                ""));
-        FileStoreTable table = 
FileStoreTableFactory.create(LocalFileIO.create(), path);
-        String user = "user";
-        this.writer = table.newWrite(user);
-        this.commit = table.newCommit(user);
-    }
-
-    public void write(InternalRow row) throws Exception {
-        writer.write(row);
-    }
-
-    public void commit() throws Exception {
-        commit.commit(0, writer.prepareCommit(true, 0));
-    }
-}
diff --git 
a/paimon-trino-358/src/test/java/org/apache/paimon/trino/TestTrino358ITCase.java
 
b/paimon-trino-358/src/test/java/org/apache/paimon/trino/TestTrino358ITCase.java
new file mode 100644
index 0000000..fe87b21
--- /dev/null
+++ 
b/paimon-trino-358/src/test/java/org/apache/paimon/trino/TestTrino358ITCase.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.trino;
+
+import org.testng.annotations.Test;
+
+/** {@link TestTrinoITCase} for Trino 358. */
+public class TestTrino358ITCase extends TestTrinoITCase {
+
+    @Override
+    @Test
+    public void testSetTableProperties() {
+        // not supported
+    }
+}
diff --git 
a/paimon-trino-358/src/test/java/org/apache/paimon/trino/TestTrinoITCase.java 
b/paimon-trino-358/src/test/java/org/apache/paimon/trino/TestTrinoITCase.java
deleted file mode 100644
index edd7133..0000000
--- 
a/paimon-trino-358/src/test/java/org/apache/paimon/trino/TestTrinoITCase.java
+++ /dev/null
@@ -1,384 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.trino;
-
-import org.apache.paimon.data.GenericMap;
-import org.apache.paimon.data.GenericRow;
-import org.apache.paimon.fs.Path;
-import org.apache.paimon.fs.local.LocalFileIO;
-import org.apache.paimon.schema.Schema;
-import org.apache.paimon.schema.SchemaManager;
-import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.FileStoreTableFactory;
-import org.apache.paimon.table.sink.InnerTableCommit;
-import org.apache.paimon.table.sink.InnerTableWrite;
-import org.apache.paimon.types.BigIntType;
-import org.apache.paimon.types.CharType;
-import org.apache.paimon.types.DataField;
-import org.apache.paimon.types.DataTypes;
-import org.apache.paimon.types.IntType;
-import org.apache.paimon.types.MapType;
-import org.apache.paimon.types.RowKind;
-import org.apache.paimon.types.RowType;
-import org.apache.paimon.types.VarCharType;
-
-import io.trino.testing.AbstractTestQueryFramework;
-import io.trino.testing.DistributedQueryRunner;
-import io.trino.testing.MaterializedResult;
-import io.trino.testing.QueryRunner;
-import org.testng.annotations.Test;
-
-import java.nio.file.Files;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
-
-import static io.airlift.testing.Closeables.closeAllSuppress;
-import static io.trino.testing.TestingSession.testSessionBuilder;
-import static org.apache.paimon.data.BinaryString.fromString;
-import static org.assertj.core.api.Assertions.assertThat;
-
-/** ITCase for trino connector. */
-public class TestTrinoITCase extends AbstractTestQueryFramework {
-
-    private static final String CATALOG = "paimon";
-    private static final String DB = "default";
-
-    @Override
-    protected QueryRunner createQueryRunner() throws Exception {
-        String warehouse =
-                
Files.createTempDirectory(UUID.randomUUID().toString()).toUri().toString();
-        // flink sink
-        Path tablePath1 = new Path(warehouse, DB + ".db/t1");
-        SimpleTableTestHelper testHelper1 = createTestHelper(tablePath1);
-        testHelper1.write(GenericRow.of(1, 2L, fromString("1"), 
fromString("1")));
-        testHelper1.write(GenericRow.of(3, 4L, fromString("2"), 
fromString("2")));
-        testHelper1.write(GenericRow.of(5, 6L, fromString("3"), 
fromString("3")));
-        testHelper1.write(
-                GenericRow.ofKind(RowKind.DELETE, 3, 4L, fromString("2"), 
fromString("2")));
-        testHelper1.commit();
-
-        Path tablePath2 = new Path(warehouse, "default.db/t2");
-        SimpleTableTestHelper testHelper2 = createTestHelper(tablePath2);
-        testHelper2.write(GenericRow.of(1, 2L, fromString("1"), 
fromString("1")));
-        testHelper2.write(GenericRow.of(3, 4L, fromString("2"), 
fromString("2")));
-        testHelper2.commit();
-        testHelper2.write(GenericRow.of(5, 6L, fromString("3"), 
fromString("3")));
-        testHelper2.write(GenericRow.of(7, 8L, fromString("4"), 
fromString("4")));
-        testHelper2.commit();
-
-        {
-            Path tablePath3 = new Path(warehouse, "default.db/t3");
-            RowType rowType =
-                    new RowType(
-                            Arrays.asList(
-                                    new DataField(0, "pt", DataTypes.STRING()),
-                                    new DataField(1, "a", new IntType()),
-                                    new DataField(2, "b", new BigIntType()),
-                                    new DataField(3, "c", new BigIntType()),
-                                    new DataField(4, "d", new IntType())));
-            new SchemaManager(LocalFileIO.create(), tablePath3)
-                    .createTable(
-                            new Schema(
-                                    rowType.getFields(),
-                                    Collections.singletonList("pt"),
-                                    Collections.emptyList(),
-                                    new HashMap<>(),
-                                    ""));
-            FileStoreTable table = 
FileStoreTableFactory.create(LocalFileIO.create(), tablePath3);
-            InnerTableWrite writer = table.newWrite("user");
-            InnerTableCommit commit = table.newCommit("user");
-            writer.write(GenericRow.of(fromString("1"), 1, 1L, 1L, 1));
-            writer.write(GenericRow.of(fromString("1"), 1, 2L, 2L, 2));
-            writer.write(GenericRow.of(fromString("2"), 3, 3L, 3L, 3));
-            commit.commit(0, writer.prepareCommit(true, 0));
-        }
-
-        {
-            Path tablePath4 = new Path(warehouse, "default.db/t4");
-            RowType rowType =
-                    new RowType(
-                            Arrays.asList(
-                                    new DataField(0, "i", new IntType()),
-                                    new DataField(
-                                            1,
-                                            "map",
-                                            new MapType(
-                                                    new 
VarCharType(VarCharType.MAX_LENGTH),
-                                                    new 
VarCharType(VarCharType.MAX_LENGTH)))));
-            new SchemaManager(LocalFileIO.create(), tablePath4)
-                    .createTable(
-                            new Schema(
-                                    rowType.getFields(),
-                                    Collections.emptyList(),
-                                    Collections.singletonList("i"),
-                                    new HashMap<>(),
-                                    ""));
-            FileStoreTable table = 
FileStoreTableFactory.create(LocalFileIO.create(), tablePath4);
-            InnerTableWrite writer = table.newWrite("user");
-            InnerTableCommit commit = table.newCommit("user");
-            writer.write(
-                    GenericRow.of(
-                            1,
-                            new GenericMap(
-                                    new HashMap<>() {
-                                        {
-                                            put(fromString("1"), 
fromString("2"));
-                                        }
-                                    })));
-            commit.commit(0, writer.prepareCommit(true, 0));
-        }
-
-        DistributedQueryRunner queryRunner = null;
-        try {
-            queryRunner =
-                    DistributedQueryRunner.builder(
-                                    
testSessionBuilder().setCatalog(CATALOG).setSchema(DB).build())
-                            .build();
-            queryRunner.installPlugin(new TrinoPlugin());
-            Map<String, String> options = new HashMap<>();
-            options.put("warehouse", warehouse);
-            queryRunner.createCatalog(CATALOG, CATALOG, options);
-            return queryRunner;
-        } catch (Throwable e) {
-            closeAllSuppress(e, queryRunner);
-            throw e;
-        }
-    }
-
-    private static SimpleTableTestHelper createTestHelper(Path tablePath) 
throws Exception {
-        RowType rowType =
-                new RowType(
-                        Arrays.asList(
-                                new DataField(0, "a", new IntType()),
-                                new DataField(1, "b", new BigIntType()),
-                                // test field name has upper case
-                                new DataField(2, "aCa", new VarCharType()),
-                                new DataField(3, "d", new CharType(1))));
-        return new SimpleTableTestHelper(tablePath, rowType);
-    }
-
-    @Test
-    public void testComplexTypes() {
-        assertThat(sql("SELECT * FROM paimon.default.t4")).isEqualTo("[[1, 
{1=2}]]");
-    }
-
-    @Test
-    public void testProjection() {
-        assertThat(sql("SELECT * FROM paimon.default.t1"))
-                .isEqualTo("[[1, 2, 1, 1], [5, 6, 3, 3]]");
-        assertThat(sql("SELECT a, aCa FROM 
paimon.default.t1")).isEqualTo("[[1, 1], [5, 3]]");
-        assertThat(sql("SELECT SUM(b) FROM 
paimon.default.t1")).isEqualTo("[[8]]");
-    }
-
-    @Test
-    public void testSystemTable() {
-        assertThat(
-                        sql(
-                                "SELECT 
snapshot_id,schema_id,commit_user,commit_identifier,commit_kind FROM 
\"t1$snapshots\""))
-                .isEqualTo("[[1, 0, user, 0, APPEND]]");
-    }
-
-    @Test
-    public void testFilter() {
-        assertThat(sql("SELECT a, aCa FROM paimon.default.t2 WHERE a < 4"))
-                .isEqualTo("[[1, 1], [3, 2]]");
-    }
-
-    @Test
-    public void testGroupByWithCast() {
-        assertThat(
-                        sql(
-                                "SELECT pt, a, SUM(b), SUM(d) FROM 
paimon.default.t3 GROUP BY pt, a ORDER BY pt, a"))
-                .isEqualTo("[[1, 1, 3, 3], [2, 3, 3, 3]]");
-    }
-
-    @Test
-    public void testShowCreateTable() {
-        assertThat(sql("SHOW CREATE TABLE paimon.default.t3"))
-                .isEqualTo(
-                        "[[CREATE TABLE paimon.default.t3 (\n"
-                                + "   pt varchar,\n"
-                                + "   a integer,\n"
-                                + "   b bigint,\n"
-                                + "   c bigint,\n"
-                                + "   d integer\n"
-                                + ")]]");
-    }
-
-    @Test
-    public void testCreateSchema() {
-        sql("CREATE SCHEMA paimon.test");
-        assertThat(sql("SHOW SCHEMAS FROM paimon"))
-                .isEqualTo("[[default], [information_schema], [test]]");
-        sql("DROP SCHEMA paimon.test");
-    }
-
-    @Test
-    public void testDropSchema() {
-        sql("CREATE SCHEMA paimon.tpch");
-        sql("DROP SCHEMA paimon.tpch");
-        assertThat(sql("SHOW SCHEMAS FROM paimon")).isEqualTo("[[default], 
[information_schema]]");
-    }
-
-    @Test
-    public void testCreateTable() {
-        sql(
-                "CREATE TABLE orders ("
-                        + "  order_key bigint,"
-                        + "  order_status varchar,"
-                        + "  total_price double,"
-                        + "  order_date date"
-                        + ")"
-                        + "WITH ("
-                        + "file_format = 'ORC',"
-                        + "primary_key = ARRAY['order_key','order_date'],"
-                        + "partitioned_by = ARRAY['order_date'],"
-                        + "bucket = '2',"
-                        + "bucket_key = 'order_key',"
-                        + "changelog_producer = 'input'"
-                        + ")");
-        assertThat(sql("SHOW TABLES FROM paimon.default"))
-                .isEqualTo("[[orders], [t1], [t2], [t3], [t4]]");
-        sql("DROP TABLE IF EXISTS paimon.default.orders");
-    }
-
-    @Test
-    public void testRenameTable() {
-        sql(
-                "CREATE TABLE t5 ("
-                        + "  order_key bigint,"
-                        + "  order_status varchar,"
-                        + "  total_price double,"
-                        + "  order_date date"
-                        + ")"
-                        + "WITH ("
-                        + "file_format = 'ORC',"
-                        + "primary_key = ARRAY['order_key','order_date'],"
-                        + "partitioned_by = ARRAY['order_date'],"
-                        + "bucket = '2',"
-                        + "bucket_key = 'order_key',"
-                        + "changelog_producer = 'input'"
-                        + ")");
-        sql("ALTER TABLE paimon.default.t5 RENAME TO t6");
-        assertThat(sql("SHOW TABLES FROM paimon.default"))
-                .isEqualTo("[[t1], [t2], [t3], [t4], [t6]]");
-        sql("DROP TABLE IF EXISTS paimon.default.t6");
-    }
-
-    @Test
-    public void testDropTable() {
-        sql(
-                "CREATE TABLE t5 ("
-                        + "  order_key bigint,"
-                        + "  order_status varchar,"
-                        + "  total_price double,"
-                        + "  order_date date"
-                        + ")"
-                        + "WITH ("
-                        + "file_format = 'ORC',"
-                        + "primary_key = ARRAY['order_key','order_date'],"
-                        + "partitioned_by = ARRAY['order_date'],"
-                        + "bucket = '2',"
-                        + "bucket_key = 'order_key',"
-                        + "changelog_producer = 'input'"
-                        + ")");
-        sql("DROP TABLE IF EXISTS paimon.default.t5");
-        assertThat(sql("SHOW TABLES FROM paimon.default")).isEqualTo("[[t1], 
[t2], [t3], [t4]]");
-    }
-
-    @Test
-    public void testAddColumn() {
-        sql(
-                "CREATE TABLE t5 ("
-                        + "  order_key bigint,"
-                        + "  order_status varchar,"
-                        + "  total_price double,"
-                        + "  order_date date"
-                        + ")"
-                        + "WITH ("
-                        + "file_format = 'ORC',"
-                        + "primary_key = ARRAY['order_key','order_date'],"
-                        + "partitioned_by = ARRAY['order_date'],"
-                        + "bucket = '2',"
-                        + "bucket_key = 'order_key',"
-                        + "changelog_producer = 'input'"
-                        + ")");
-        sql("ALTER TABLE paimon.default.t5 ADD COLUMN zip varchar");
-        assertThat(sql("SHOW COLUMNS FROM paimon.default.t5"))
-                .isEqualTo(
-                        "[[order_key, bigint, , ], [order_status, 
varchar(2147483646), , ], [total_price, double, , ], [order_date, date, , ], 
[zip, varchar(2147483646), , ]]");
-        sql("DROP TABLE IF EXISTS paimon.default.t5");
-    }
-
-    @Test
-    public void testRenameColumn() {
-        sql(
-                "CREATE TABLE t5 ("
-                        + "  order_key bigint,"
-                        + "  order_status varchar,"
-                        + "  total_price double,"
-                        + "  order_date date"
-                        + ")"
-                        + "WITH ("
-                        + "file_format = 'ORC',"
-                        + "primary_key = ARRAY['order_key','order_date'],"
-                        + "partitioned_by = ARRAY['order_date'],"
-                        + "bucket = '2',"
-                        + "bucket_key = 'order_key',"
-                        + "changelog_producer = 'input'"
-                        + ")");
-        sql("ALTER TABLE paimon.default.t5 RENAME COLUMN order_status to g");
-        assertThat(sql("SHOW COLUMNS FROM paimon.default.t5"))
-                .isEqualTo(
-                        "[[order_key, bigint, , ], [g, varchar(2147483646), , 
], [total_price, double, , ], [order_date, date, , ]]");
-        sql("DROP TABLE IF EXISTS paimon.default.t5");
-    }
-
-    @Test
-    public void testDropColumn() {
-        sql(
-                "CREATE TABLE t5 ("
-                        + "  order_key bigint,"
-                        + "  order_status varchar,"
-                        + "  total_price double,"
-                        + "  order_date date"
-                        + ")"
-                        + "WITH ("
-                        + "file_format = 'ORC',"
-                        + "primary_key = ARRAY['order_key','order_date'],"
-                        + "partitioned_by = ARRAY['order_date'],"
-                        + "bucket = '2',"
-                        + "bucket_key = 'order_key',"
-                        + "changelog_producer = 'input'"
-                        + ")");
-        sql("ALTER TABLE paimon.default.t5 DROP COLUMN order_status");
-        assertThat(sql("SHOW COLUMNS FROM paimon.default.t5"))
-                .isEqualTo(
-                        "[[order_key, bigint, , ], [total_price, double, , ], 
[order_date, date, , ]]");
-        sql("DROP TABLE IF EXISTS paimon.default.t5");
-    }
-
-    private String sql(String sql) {
-        MaterializedResult result = getQueryRunner().execute(sql);
-        return result.getMaterializedRows().toString();
-    }
-}
diff --git a/paimon-trino-368/pom.xml b/paimon-trino-368/pom.xml
index af7a8dc..7fda181 100644
--- a/paimon-trino-368/pom.xml
+++ b/paimon-trino-368/pom.xml
@@ -73,6 +73,15 @@ under the License.
         </dependency>
 
         <!-- for testing -->
+
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>paimon-trino-common</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+            <type>test-jar</type>
+        </dependency>
+
         <dependency>
             <groupId>io.trino</groupId>
             <artifactId>trino-client</artifactId>
diff --git 
a/paimon-trino-368/src/test/java/org/apache/paimon/trino/SimpleTableTestHelper.java
 
b/paimon-trino-368/src/test/java/org/apache/paimon/trino/SimpleTableTestHelper.java
deleted file mode 100644
index 621cfbc..0000000
--- 
a/paimon-trino-368/src/test/java/org/apache/paimon/trino/SimpleTableTestHelper.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.trino;
-
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.fs.Path;
-import org.apache.paimon.fs.local.LocalFileIO;
-import org.apache.paimon.schema.Schema;
-import org.apache.paimon.schema.SchemaManager;
-import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.FileStoreTableFactory;
-import org.apache.paimon.table.sink.InnerTableCommit;
-import org.apache.paimon.table.sink.InnerTableWrite;
-import org.apache.paimon.types.RowType;
-
-import java.util.Collections;
-import java.util.HashMap;
-
-/** A simple table test helper to write and commit. */
-public class SimpleTableTestHelper {
-
-    private final InnerTableWrite writer;
-    private final InnerTableCommit commit;
-
-    public SimpleTableTestHelper(Path path, RowType rowType) throws Exception {
-        new SchemaManager(LocalFileIO.create(), path)
-                .createTable(
-                        new Schema(
-                                rowType.getFields(),
-                                Collections.emptyList(),
-                                Collections.emptyList(),
-                                new HashMap<>() {
-                                    {
-                                        put("write-mode", "change-log");
-                                    }
-                                },
-                                ""));
-        FileStoreTable table = 
FileStoreTableFactory.create(LocalFileIO.create(), path);
-        String user = "user";
-        this.writer = table.newWrite(user);
-        this.commit = table.newCommit(user);
-    }
-
-    public void write(InternalRow row) throws Exception {
-        writer.write(row);
-    }
-
-    public void commit() throws Exception {
-        commit.commit(0, writer.prepareCommit(true, 0));
-    }
-}
diff --git 
a/paimon-trino-368/src/test/java/org/apache/paimon/trino/TestTrino368ITCase.java
 
b/paimon-trino-368/src/test/java/org/apache/paimon/trino/TestTrino368ITCase.java
new file mode 100644
index 0000000..6063600
--- /dev/null
+++ 
b/paimon-trino-368/src/test/java/org/apache/paimon/trino/TestTrino368ITCase.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.trino;
+
+/** {@link TestTrinoITCase} for Trino 368. */
+public class TestTrino368ITCase extends TestTrinoITCase {}
diff --git 
a/paimon-trino-368/src/test/java/org/apache/paimon/trino/TestTrinoITCase.java 
b/paimon-trino-368/src/test/java/org/apache/paimon/trino/TestTrinoITCase.java
deleted file mode 100644
index c3e94b7..0000000
--- 
a/paimon-trino-368/src/test/java/org/apache/paimon/trino/TestTrinoITCase.java
+++ /dev/null
@@ -1,406 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.trino;
-
-import org.apache.paimon.data.GenericMap;
-import org.apache.paimon.data.GenericRow;
-import org.apache.paimon.fs.Path;
-import org.apache.paimon.fs.local.LocalFileIO;
-import org.apache.paimon.schema.Schema;
-import org.apache.paimon.schema.SchemaManager;
-import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.FileStoreTableFactory;
-import org.apache.paimon.table.sink.InnerTableCommit;
-import org.apache.paimon.table.sink.InnerTableWrite;
-import org.apache.paimon.types.BigIntType;
-import org.apache.paimon.types.CharType;
-import org.apache.paimon.types.DataField;
-import org.apache.paimon.types.DataTypes;
-import org.apache.paimon.types.IntType;
-import org.apache.paimon.types.MapType;
-import org.apache.paimon.types.RowKind;
-import org.apache.paimon.types.RowType;
-import org.apache.paimon.types.VarCharType;
-
-import io.trino.testing.AbstractTestQueryFramework;
-import io.trino.testing.DistributedQueryRunner;
-import io.trino.testing.MaterializedResult;
-import io.trino.testing.QueryRunner;
-import org.testng.annotations.Test;
-
-import java.nio.file.Files;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
-
-import static io.airlift.testing.Closeables.closeAllSuppress;
-import static io.trino.testing.TestingSession.testSessionBuilder;
-import static org.apache.paimon.data.BinaryString.fromString;
-import static org.assertj.core.api.Assertions.assertThat;
-
-/** ITCase for trino connector. */
-public class TestTrinoITCase extends AbstractTestQueryFramework {
-
-    private static final String CATALOG = "paimon";
-    private static final String DB = "default";
-
-    @Override
-    protected QueryRunner createQueryRunner() throws Exception {
-        String warehouse =
-                
Files.createTempDirectory(UUID.randomUUID().toString()).toUri().toString();
-        // flink sink
-        Path tablePath1 = new Path(warehouse, DB + ".db/t1");
-        SimpleTableTestHelper testHelper1 = createTestHelper(tablePath1);
-        testHelper1.write(GenericRow.of(1, 2L, fromString("1"), 
fromString("1")));
-        testHelper1.write(GenericRow.of(3, 4L, fromString("2"), 
fromString("2")));
-        testHelper1.write(GenericRow.of(5, 6L, fromString("3"), 
fromString("3")));
-        testHelper1.write(
-                GenericRow.ofKind(RowKind.DELETE, 3, 4L, fromString("2"), 
fromString("2")));
-        testHelper1.commit();
-
-        Path tablePath2 = new Path(warehouse, "default.db/t2");
-        SimpleTableTestHelper testHelper2 = createTestHelper(tablePath2);
-        testHelper2.write(GenericRow.of(1, 2L, fromString("1"), 
fromString("1")));
-        testHelper2.write(GenericRow.of(3, 4L, fromString("2"), 
fromString("2")));
-        testHelper2.commit();
-        testHelper2.write(GenericRow.of(5, 6L, fromString("3"), 
fromString("3")));
-        testHelper2.write(GenericRow.of(7, 8L, fromString("4"), 
fromString("4")));
-        testHelper2.commit();
-
-        {
-            Path tablePath3 = new Path(warehouse, "default.db/t3");
-            RowType rowType =
-                    new RowType(
-                            Arrays.asList(
-                                    new DataField(0, "pt", DataTypes.STRING()),
-                                    new DataField(1, "a", new IntType()),
-                                    new DataField(2, "b", new BigIntType()),
-                                    new DataField(3, "c", new BigIntType()),
-                                    new DataField(4, "d", new IntType())));
-            new SchemaManager(LocalFileIO.create(), tablePath3)
-                    .createTable(
-                            new Schema(
-                                    rowType.getFields(),
-                                    Collections.singletonList("pt"),
-                                    Collections.emptyList(),
-                                    new HashMap<>(),
-                                    ""));
-            FileStoreTable table = 
FileStoreTableFactory.create(LocalFileIO.create(), tablePath3);
-            InnerTableWrite writer = table.newWrite("user");
-            InnerTableCommit commit = table.newCommit("user");
-            writer.write(GenericRow.of(fromString("1"), 1, 1L, 1L, 1));
-            writer.write(GenericRow.of(fromString("1"), 1, 2L, 2L, 2));
-            writer.write(GenericRow.of(fromString("2"), 3, 3L, 3L, 3));
-            commit.commit(0, writer.prepareCommit(true, 0));
-        }
-
-        {
-            Path tablePath4 = new Path(warehouse, "default.db/t4");
-            RowType rowType =
-                    new RowType(
-                            Arrays.asList(
-                                    new DataField(0, "i", new IntType()),
-                                    new DataField(
-                                            1,
-                                            "map",
-                                            new MapType(
-                                                    new 
VarCharType(VarCharType.MAX_LENGTH),
-                                                    new 
VarCharType(VarCharType.MAX_LENGTH)))));
-            new SchemaManager(LocalFileIO.create(), tablePath4)
-                    .createTable(
-                            new Schema(
-                                    rowType.getFields(),
-                                    Collections.emptyList(),
-                                    Collections.singletonList("i"),
-                                    new HashMap<>(),
-                                    ""));
-            FileStoreTable table = 
FileStoreTableFactory.create(LocalFileIO.create(), tablePath4);
-            InnerTableWrite writer = table.newWrite("user");
-            InnerTableCommit commit = table.newCommit("user");
-            writer.write(
-                    GenericRow.of(
-                            1,
-                            new GenericMap(
-                                    new HashMap<>() {
-                                        {
-                                            put(fromString("1"), 
fromString("2"));
-                                        }
-                                    })));
-            commit.commit(0, writer.prepareCommit(true, 0));
-        }
-
-        DistributedQueryRunner queryRunner = null;
-        try {
-            queryRunner =
-                    DistributedQueryRunner.builder(
-                                    
testSessionBuilder().setCatalog(CATALOG).setSchema(DB).build())
-                            .build();
-            queryRunner.installPlugin(new TrinoPlugin());
-            Map<String, String> options = new HashMap<>();
-            options.put("warehouse", warehouse);
-            queryRunner.createCatalog(CATALOG, CATALOG, options);
-            return queryRunner;
-        } catch (Throwable e) {
-            closeAllSuppress(e, queryRunner);
-            throw e;
-        }
-    }
-
-    private static SimpleTableTestHelper createTestHelper(Path tablePath) 
throws Exception {
-        RowType rowType =
-                new RowType(
-                        Arrays.asList(
-                                new DataField(0, "a", new IntType()),
-                                new DataField(1, "b", new BigIntType()),
-                                // test field name has upper case
-                                new DataField(2, "aCa", new VarCharType()),
-                                new DataField(3, "d", new CharType(1))));
-        return new SimpleTableTestHelper(tablePath, rowType);
-    }
-
-    @Test
-    public void testComplexTypes() {
-        assertThat(sql("SELECT * FROM paimon.default.t4")).isEqualTo("[[1, 
{1=2}]]");
-    }
-
-    @Test
-    public void testProjection() {
-        assertThat(sql("SELECT * FROM paimon.default.t1"))
-                .isEqualTo("[[1, 2, 1, 1], [5, 6, 3, 3]]");
-        assertThat(sql("SELECT a, aCa FROM 
paimon.default.t1")).isEqualTo("[[1, 1], [5, 3]]");
-        assertThat(sql("SELECT SUM(b) FROM 
paimon.default.t1")).isEqualTo("[[8]]");
-    }
-
-    @Test
-    public void testSystemTable() {
-        assertThat(
-                        sql(
-                                "SELECT 
snapshot_id,schema_id,commit_user,commit_identifier,commit_kind FROM 
\"t1$snapshots\""))
-                .isEqualTo("[[1, 0, user, 0, APPEND]]");
-    }
-
-    @Test
-    public void testFilter() {
-        assertThat(sql("SELECT a, aCa FROM paimon.default.t2 WHERE a < 4"))
-                .isEqualTo("[[1, 1], [3, 2]]");
-    }
-
-    @Test
-    public void testGroupByWithCast() {
-        assertThat(
-                        sql(
-                                "SELECT pt, a, SUM(b), SUM(d) FROM 
paimon.default.t3 GROUP BY pt, a ORDER BY pt, a"))
-                .isEqualTo("[[1, 1, 3, 3], [2, 3, 3, 3]]");
-    }
-
-    @Test
-    public void testShowCreateTable() {
-        assertThat(sql("SHOW CREATE TABLE paimon.default.t3"))
-                .isEqualTo(
-                        "[[CREATE TABLE paimon.default.t3 (\n"
-                                + "   pt varchar,\n"
-                                + "   a integer,\n"
-                                + "   b bigint,\n"
-                                + "   c bigint,\n"
-                                + "   d integer\n"
-                                + ")]]");
-    }
-
-    @Test
-    public void testCreateSchema() {
-        sql("CREATE SCHEMA paimon.test");
-        assertThat(sql("SHOW SCHEMAS FROM paimon"))
-                .isEqualTo("[[default], [information_schema], [test]]");
-        sql("DROP SCHEMA paimon.test");
-    }
-
-    @Test
-    public void testDropSchema() {
-        sql("CREATE SCHEMA paimon.tpch");
-        sql("DROP SCHEMA paimon.tpch");
-        assertThat(sql("SHOW SCHEMAS FROM paimon")).isEqualTo("[[default], 
[information_schema]]");
-    }
-
-    @Test
-    public void testCreateTable() {
-        sql(
-                "CREATE TABLE orders ("
-                        + "  order_key bigint,"
-                        + "  order_status varchar,"
-                        + "  total_price double,"
-                        + "  order_date date"
-                        + ")"
-                        + "WITH ("
-                        + "file_format = 'ORC',"
-                        + "primary_key = ARRAY['order_key','order_date'],"
-                        + "partitioned_by = ARRAY['order_date'],"
-                        + "bucket = '2',"
-                        + "bucket_key = 'order_key',"
-                        + "changelog_producer = 'input'"
-                        + ")");
-        assertThat(sql("SHOW TABLES FROM paimon.default"))
-                .isEqualTo("[[orders], [t1], [t2], [t3], [t4]]");
-        sql("DROP TABLE IF EXISTS paimon.default.orders");
-    }
-
-    @Test
-    public void testRenameTable() {
-        sql(
-                "CREATE TABLE t5 ("
-                        + "  order_key bigint,"
-                        + "  order_status varchar,"
-                        + "  total_price double,"
-                        + "  order_date date"
-                        + ")"
-                        + "WITH ("
-                        + "file_format = 'ORC',"
-                        + "primary_key = ARRAY['order_key','order_date'],"
-                        + "partitioned_by = ARRAY['order_date'],"
-                        + "bucket = '2',"
-                        + "bucket_key = 'order_key',"
-                        + "changelog_producer = 'input'"
-                        + ")");
-        sql("ALTER TABLE paimon.default.t5 RENAME TO t6");
-        assertThat(sql("SHOW TABLES FROM paimon.default"))
-                .isEqualTo("[[t1], [t2], [t3], [t4], [t6]]");
-        sql("DROP TABLE IF EXISTS paimon.default.t6");
-    }
-
-    @Test
-    public void testDropTable() {
-        sql(
-                "CREATE TABLE t5 ("
-                        + "  order_key bigint,"
-                        + "  order_status varchar,"
-                        + "  total_price double,"
-                        + "  order_date date"
-                        + ")"
-                        + "WITH ("
-                        + "file_format = 'ORC',"
-                        + "primary_key = ARRAY['order_key','order_date'],"
-                        + "partitioned_by = ARRAY['order_date'],"
-                        + "bucket = '2',"
-                        + "bucket_key = 'order_key',"
-                        + "changelog_producer = 'input'"
-                        + ")");
-        sql("DROP TABLE IF EXISTS paimon.default.t5");
-        assertThat(sql("SHOW TABLES FROM paimon.default")).isEqualTo("[[t1], 
[t2], [t3], [t4]]");
-    }
-
-    @Test
-    public void testAddColumn() {
-        sql(
-                "CREATE TABLE t5 ("
-                        + "  order_key bigint,"
-                        + "  order_status varchar,"
-                        + "  total_price double,"
-                        + "  order_date date"
-                        + ")"
-                        + "WITH ("
-                        + "file_format = 'ORC',"
-                        + "primary_key = ARRAY['order_key','order_date'],"
-                        + "partitioned_by = ARRAY['order_date'],"
-                        + "bucket = '2',"
-                        + "bucket_key = 'order_key',"
-                        + "changelog_producer = 'input'"
-                        + ")");
-        sql("ALTER TABLE paimon.default.t5 ADD COLUMN zip varchar");
-        assertThat(sql("SHOW COLUMNS FROM paimon.default.t5"))
-                .isEqualTo(
-                        "[[order_key, bigint, , ], [order_status, 
varchar(2147483646), , ], [total_price, double, , ], [order_date, date, , ], 
[zip, varchar(2147483646), , ]]");
-        sql("DROP TABLE IF EXISTS paimon.default.t5");
-    }
-
-    @Test
-    public void testRenameColumn() {
-        sql(
-                "CREATE TABLE t5 ("
-                        + "  order_key bigint,"
-                        + "  order_status varchar,"
-                        + "  total_price double,"
-                        + "  order_date date"
-                        + ")"
-                        + "WITH ("
-                        + "file_format = 'ORC',"
-                        + "primary_key = ARRAY['order_key','order_date'],"
-                        + "partitioned_by = ARRAY['order_date'],"
-                        + "bucket = '2',"
-                        + "bucket_key = 'order_key',"
-                        + "changelog_producer = 'input'"
-                        + ")");
-        sql("ALTER TABLE paimon.default.t5 RENAME COLUMN order_status to g");
-        assertThat(sql("SHOW COLUMNS FROM paimon.default.t5"))
-                .isEqualTo(
-                        "[[order_key, bigint, , ], [g, varchar(2147483646), , 
], [total_price, double, , ], [order_date, date, , ]]");
-        sql("DROP TABLE IF EXISTS paimon.default.t5");
-    }
-
-    @Test
-    public void testDropColumn() {
-        sql(
-                "CREATE TABLE t5 ("
-                        + "  order_key bigint,"
-                        + "  order_status varchar,"
-                        + "  total_price double,"
-                        + "  order_date date"
-                        + ")"
-                        + "WITH ("
-                        + "file_format = 'ORC',"
-                        + "primary_key = ARRAY['order_key','order_date'],"
-                        + "partitioned_by = ARRAY['order_date'],"
-                        + "bucket = '2',"
-                        + "bucket_key = 'order_key',"
-                        + "changelog_producer = 'input'"
-                        + ")");
-        sql("ALTER TABLE paimon.default.t5 DROP COLUMN order_status");
-        assertThat(sql("SHOW COLUMNS FROM paimon.default.t5"))
-                .isEqualTo(
-                        "[[order_key, bigint, , ], [total_price, double, , ], 
[order_date, date, , ]]");
-        sql("DROP TABLE IF EXISTS paimon.default.t5");
-    }
-
-    @Test
-    public void testSetTableProperties() {
-        sql(
-                "CREATE TABLE t5 ("
-                        + "  order_key bigint,"
-                        + "  order_status varchar,"
-                        + "  total_price double,"
-                        + "  order_date date"
-                        + ")"
-                        + "WITH ("
-                        + "file_format = 'ORC',"
-                        + "primary_key = ARRAY['order_key','order_date'],"
-                        + "partitioned_by = ARRAY['order_date'],"
-                        + "bucket = '2',"
-                        + "bucket_key = 'order_key',"
-                        + "changelog_producer = 'input'"
-                        + ")");
-        sql(
-                "ALTER TABLE paimon.default.t5 SET PROPERTIES bucket = 
'4',snapshot_time_retained = '4h'");
-        sql("DROP TABLE IF EXISTS paimon.default.t5");
-    }
-
-    private String sql(String sql) {
-        MaterializedResult result = getQueryRunner().execute(sql);
-        return result.getMaterializedRows().toString();
-    }
-}
diff --git a/paimon-trino-369/pom.xml b/paimon-trino-369/pom.xml
index 0c50883..b8ec647 100644
--- a/paimon-trino-369/pom.xml
+++ b/paimon-trino-369/pom.xml
@@ -73,6 +73,15 @@ under the License.
         </dependency>
 
         <!-- for testing -->
+
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>paimon-trino-common</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+            <type>test-jar</type>
+        </dependency>
+
         <dependency>
             <groupId>io.trino</groupId>
             <artifactId>trino-client</artifactId>
diff --git 
a/paimon-trino-369/src/test/java/org/apache/paimon/trino/SimpleTableTestHelper.java
 
b/paimon-trino-369/src/test/java/org/apache/paimon/trino/SimpleTableTestHelper.java
deleted file mode 100644
index 621cfbc..0000000
--- 
a/paimon-trino-369/src/test/java/org/apache/paimon/trino/SimpleTableTestHelper.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.trino;
-
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.fs.Path;
-import org.apache.paimon.fs.local.LocalFileIO;
-import org.apache.paimon.schema.Schema;
-import org.apache.paimon.schema.SchemaManager;
-import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.FileStoreTableFactory;
-import org.apache.paimon.table.sink.InnerTableCommit;
-import org.apache.paimon.table.sink.InnerTableWrite;
-import org.apache.paimon.types.RowType;
-
-import java.util.Collections;
-import java.util.HashMap;
-
-/** A simple table test helper to write and commit. */
-public class SimpleTableTestHelper {
-
-    private final InnerTableWrite writer;
-    private final InnerTableCommit commit;
-
-    public SimpleTableTestHelper(Path path, RowType rowType) throws Exception {
-        new SchemaManager(LocalFileIO.create(), path)
-                .createTable(
-                        new Schema(
-                                rowType.getFields(),
-                                Collections.emptyList(),
-                                Collections.emptyList(),
-                                new HashMap<>() {
-                                    {
-                                        put("write-mode", "change-log");
-                                    }
-                                },
-                                ""));
-        FileStoreTable table = 
FileStoreTableFactory.create(LocalFileIO.create(), path);
-        String user = "user";
-        this.writer = table.newWrite(user);
-        this.commit = table.newCommit(user);
-    }
-
-    public void write(InternalRow row) throws Exception {
-        writer.write(row);
-    }
-
-    public void commit() throws Exception {
-        commit.commit(0, writer.prepareCommit(true, 0));
-    }
-}
diff --git 
a/paimon-trino-369/src/test/java/org/apache/paimon/trino/TestTrino369ITCase.java
 
b/paimon-trino-369/src/test/java/org/apache/paimon/trino/TestTrino369ITCase.java
new file mode 100644
index 0000000..31652d0
--- /dev/null
+++ 
b/paimon-trino-369/src/test/java/org/apache/paimon/trino/TestTrino369ITCase.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.trino;
+
+/** {@link TestTrinoITCase} for Trino 369. */
+public class TestTrino369ITCase extends TestTrinoITCase {}
diff --git 
a/paimon-trino-369/src/test/java/org/apache/paimon/trino/TestTrinoITCase.java 
b/paimon-trino-369/src/test/java/org/apache/paimon/trino/TestTrinoITCase.java
deleted file mode 100644
index c3e94b7..0000000
--- 
a/paimon-trino-369/src/test/java/org/apache/paimon/trino/TestTrinoITCase.java
+++ /dev/null
@@ -1,406 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.trino;
-
-import org.apache.paimon.data.GenericMap;
-import org.apache.paimon.data.GenericRow;
-import org.apache.paimon.fs.Path;
-import org.apache.paimon.fs.local.LocalFileIO;
-import org.apache.paimon.schema.Schema;
-import org.apache.paimon.schema.SchemaManager;
-import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.FileStoreTableFactory;
-import org.apache.paimon.table.sink.InnerTableCommit;
-import org.apache.paimon.table.sink.InnerTableWrite;
-import org.apache.paimon.types.BigIntType;
-import org.apache.paimon.types.CharType;
-import org.apache.paimon.types.DataField;
-import org.apache.paimon.types.DataTypes;
-import org.apache.paimon.types.IntType;
-import org.apache.paimon.types.MapType;
-import org.apache.paimon.types.RowKind;
-import org.apache.paimon.types.RowType;
-import org.apache.paimon.types.VarCharType;
-
-import io.trino.testing.AbstractTestQueryFramework;
-import io.trino.testing.DistributedQueryRunner;
-import io.trino.testing.MaterializedResult;
-import io.trino.testing.QueryRunner;
-import org.testng.annotations.Test;
-
-import java.nio.file.Files;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
-
-import static io.airlift.testing.Closeables.closeAllSuppress;
-import static io.trino.testing.TestingSession.testSessionBuilder;
-import static org.apache.paimon.data.BinaryString.fromString;
-import static org.assertj.core.api.Assertions.assertThat;
-
-/** ITCase for trino connector. */
-public class TestTrinoITCase extends AbstractTestQueryFramework {
-
-    private static final String CATALOG = "paimon";
-    private static final String DB = "default";
-
-    @Override
-    protected QueryRunner createQueryRunner() throws Exception {
-        String warehouse =
-                
Files.createTempDirectory(UUID.randomUUID().toString()).toUri().toString();
-        // flink sink
-        Path tablePath1 = new Path(warehouse, DB + ".db/t1");
-        SimpleTableTestHelper testHelper1 = createTestHelper(tablePath1);
-        testHelper1.write(GenericRow.of(1, 2L, fromString("1"), 
fromString("1")));
-        testHelper1.write(GenericRow.of(3, 4L, fromString("2"), 
fromString("2")));
-        testHelper1.write(GenericRow.of(5, 6L, fromString("3"), 
fromString("3")));
-        testHelper1.write(
-                GenericRow.ofKind(RowKind.DELETE, 3, 4L, fromString("2"), 
fromString("2")));
-        testHelper1.commit();
-
-        Path tablePath2 = new Path(warehouse, "default.db/t2");
-        SimpleTableTestHelper testHelper2 = createTestHelper(tablePath2);
-        testHelper2.write(GenericRow.of(1, 2L, fromString("1"), 
fromString("1")));
-        testHelper2.write(GenericRow.of(3, 4L, fromString("2"), 
fromString("2")));
-        testHelper2.commit();
-        testHelper2.write(GenericRow.of(5, 6L, fromString("3"), 
fromString("3")));
-        testHelper2.write(GenericRow.of(7, 8L, fromString("4"), 
fromString("4")));
-        testHelper2.commit();
-
-        {
-            Path tablePath3 = new Path(warehouse, "default.db/t3");
-            RowType rowType =
-                    new RowType(
-                            Arrays.asList(
-                                    new DataField(0, "pt", DataTypes.STRING()),
-                                    new DataField(1, "a", new IntType()),
-                                    new DataField(2, "b", new BigIntType()),
-                                    new DataField(3, "c", new BigIntType()),
-                                    new DataField(4, "d", new IntType())));
-            new SchemaManager(LocalFileIO.create(), tablePath3)
-                    .createTable(
-                            new Schema(
-                                    rowType.getFields(),
-                                    Collections.singletonList("pt"),
-                                    Collections.emptyList(),
-                                    new HashMap<>(),
-                                    ""));
-            FileStoreTable table = 
FileStoreTableFactory.create(LocalFileIO.create(), tablePath3);
-            InnerTableWrite writer = table.newWrite("user");
-            InnerTableCommit commit = table.newCommit("user");
-            writer.write(GenericRow.of(fromString("1"), 1, 1L, 1L, 1));
-            writer.write(GenericRow.of(fromString("1"), 1, 2L, 2L, 2));
-            writer.write(GenericRow.of(fromString("2"), 3, 3L, 3L, 3));
-            commit.commit(0, writer.prepareCommit(true, 0));
-        }
-
-        {
-            Path tablePath4 = new Path(warehouse, "default.db/t4");
-            RowType rowType =
-                    new RowType(
-                            Arrays.asList(
-                                    new DataField(0, "i", new IntType()),
-                                    new DataField(
-                                            1,
-                                            "map",
-                                            new MapType(
-                                                    new 
VarCharType(VarCharType.MAX_LENGTH),
-                                                    new 
VarCharType(VarCharType.MAX_LENGTH)))));
-            new SchemaManager(LocalFileIO.create(), tablePath4)
-                    .createTable(
-                            new Schema(
-                                    rowType.getFields(),
-                                    Collections.emptyList(),
-                                    Collections.singletonList("i"),
-                                    new HashMap<>(),
-                                    ""));
-            FileStoreTable table = 
FileStoreTableFactory.create(LocalFileIO.create(), tablePath4);
-            InnerTableWrite writer = table.newWrite("user");
-            InnerTableCommit commit = table.newCommit("user");
-            writer.write(
-                    GenericRow.of(
-                            1,
-                            new GenericMap(
-                                    new HashMap<>() {
-                                        {
-                                            put(fromString("1"), 
fromString("2"));
-                                        }
-                                    })));
-            commit.commit(0, writer.prepareCommit(true, 0));
-        }
-
-        DistributedQueryRunner queryRunner = null;
-        try {
-            queryRunner =
-                    DistributedQueryRunner.builder(
-                                    
testSessionBuilder().setCatalog(CATALOG).setSchema(DB).build())
-                            .build();
-            queryRunner.installPlugin(new TrinoPlugin());
-            Map<String, String> options = new HashMap<>();
-            options.put("warehouse", warehouse);
-            queryRunner.createCatalog(CATALOG, CATALOG, options);
-            return queryRunner;
-        } catch (Throwable e) {
-            closeAllSuppress(e, queryRunner);
-            throw e;
-        }
-    }
-
-    private static SimpleTableTestHelper createTestHelper(Path tablePath) 
throws Exception {
-        RowType rowType =
-                new RowType(
-                        Arrays.asList(
-                                new DataField(0, "a", new IntType()),
-                                new DataField(1, "b", new BigIntType()),
-                                // test field name has upper case
-                                new DataField(2, "aCa", new VarCharType()),
-                                new DataField(3, "d", new CharType(1))));
-        return new SimpleTableTestHelper(tablePath, rowType);
-    }
-
-    @Test
-    public void testComplexTypes() {
-        assertThat(sql("SELECT * FROM paimon.default.t4")).isEqualTo("[[1, 
{1=2}]]");
-    }
-
-    @Test
-    public void testProjection() {
-        assertThat(sql("SELECT * FROM paimon.default.t1"))
-                .isEqualTo("[[1, 2, 1, 1], [5, 6, 3, 3]]");
-        assertThat(sql("SELECT a, aCa FROM 
paimon.default.t1")).isEqualTo("[[1, 1], [5, 3]]");
-        assertThat(sql("SELECT SUM(b) FROM 
paimon.default.t1")).isEqualTo("[[8]]");
-    }
-
-    @Test
-    public void testSystemTable() {
-        assertThat(
-                        sql(
-                                "SELECT 
snapshot_id,schema_id,commit_user,commit_identifier,commit_kind FROM 
\"t1$snapshots\""))
-                .isEqualTo("[[1, 0, user, 0, APPEND]]");
-    }
-
-    @Test
-    public void testFilter() {
-        assertThat(sql("SELECT a, aCa FROM paimon.default.t2 WHERE a < 4"))
-                .isEqualTo("[[1, 1], [3, 2]]");
-    }
-
-    @Test
-    public void testGroupByWithCast() {
-        assertThat(
-                        sql(
-                                "SELECT pt, a, SUM(b), SUM(d) FROM 
paimon.default.t3 GROUP BY pt, a ORDER BY pt, a"))
-                .isEqualTo("[[1, 1, 3, 3], [2, 3, 3, 3]]");
-    }
-
-    @Test
-    public void testShowCreateTable() {
-        assertThat(sql("SHOW CREATE TABLE paimon.default.t3"))
-                .isEqualTo(
-                        "[[CREATE TABLE paimon.default.t3 (\n"
-                                + "   pt varchar,\n"
-                                + "   a integer,\n"
-                                + "   b bigint,\n"
-                                + "   c bigint,\n"
-                                + "   d integer\n"
-                                + ")]]");
-    }
-
-    @Test
-    public void testCreateSchema() {
-        sql("CREATE SCHEMA paimon.test");
-        assertThat(sql("SHOW SCHEMAS FROM paimon"))
-                .isEqualTo("[[default], [information_schema], [test]]");
-        sql("DROP SCHEMA paimon.test");
-    }
-
-    @Test
-    public void testDropSchema() {
-        sql("CREATE SCHEMA paimon.tpch");
-        sql("DROP SCHEMA paimon.tpch");
-        assertThat(sql("SHOW SCHEMAS FROM paimon")).isEqualTo("[[default], 
[information_schema]]");
-    }
-
-    @Test
-    public void testCreateTable() {
-        sql(
-                "CREATE TABLE orders ("
-                        + "  order_key bigint,"
-                        + "  order_status varchar,"
-                        + "  total_price double,"
-                        + "  order_date date"
-                        + ")"
-                        + "WITH ("
-                        + "file_format = 'ORC',"
-                        + "primary_key = ARRAY['order_key','order_date'],"
-                        + "partitioned_by = ARRAY['order_date'],"
-                        + "bucket = '2',"
-                        + "bucket_key = 'order_key',"
-                        + "changelog_producer = 'input'"
-                        + ")");
-        assertThat(sql("SHOW TABLES FROM paimon.default"))
-                .isEqualTo("[[orders], [t1], [t2], [t3], [t4]]");
-        sql("DROP TABLE IF EXISTS paimon.default.orders");
-    }
-
-    @Test
-    public void testRenameTable() {
-        sql(
-                "CREATE TABLE t5 ("
-                        + "  order_key bigint,"
-                        + "  order_status varchar,"
-                        + "  total_price double,"
-                        + "  order_date date"
-                        + ")"
-                        + "WITH ("
-                        + "file_format = 'ORC',"
-                        + "primary_key = ARRAY['order_key','order_date'],"
-                        + "partitioned_by = ARRAY['order_date'],"
-                        + "bucket = '2',"
-                        + "bucket_key = 'order_key',"
-                        + "changelog_producer = 'input'"
-                        + ")");
-        sql("ALTER TABLE paimon.default.t5 RENAME TO t6");
-        assertThat(sql("SHOW TABLES FROM paimon.default"))
-                .isEqualTo("[[t1], [t2], [t3], [t4], [t6]]");
-        sql("DROP TABLE IF EXISTS paimon.default.t6");
-    }
-
-    @Test
-    public void testDropTable() {
-        sql(
-                "CREATE TABLE t5 ("
-                        + "  order_key bigint,"
-                        + "  order_status varchar,"
-                        + "  total_price double,"
-                        + "  order_date date"
-                        + ")"
-                        + "WITH ("
-                        + "file_format = 'ORC',"
-                        + "primary_key = ARRAY['order_key','order_date'],"
-                        + "partitioned_by = ARRAY['order_date'],"
-                        + "bucket = '2',"
-                        + "bucket_key = 'order_key',"
-                        + "changelog_producer = 'input'"
-                        + ")");
-        sql("DROP TABLE IF EXISTS paimon.default.t5");
-        assertThat(sql("SHOW TABLES FROM paimon.default")).isEqualTo("[[t1], 
[t2], [t3], [t4]]");
-    }
-
-    @Test
-    public void testAddColumn() {
-        sql(
-                "CREATE TABLE t5 ("
-                        + "  order_key bigint,"
-                        + "  order_status varchar,"
-                        + "  total_price double,"
-                        + "  order_date date"
-                        + ")"
-                        + "WITH ("
-                        + "file_format = 'ORC',"
-                        + "primary_key = ARRAY['order_key','order_date'],"
-                        + "partitioned_by = ARRAY['order_date'],"
-                        + "bucket = '2',"
-                        + "bucket_key = 'order_key',"
-                        + "changelog_producer = 'input'"
-                        + ")");
-        sql("ALTER TABLE paimon.default.t5 ADD COLUMN zip varchar");
-        assertThat(sql("SHOW COLUMNS FROM paimon.default.t5"))
-                .isEqualTo(
-                        "[[order_key, bigint, , ], [order_status, 
varchar(2147483646), , ], [total_price, double, , ], [order_date, date, , ], 
[zip, varchar(2147483646), , ]]");
-        sql("DROP TABLE IF EXISTS paimon.default.t5");
-    }
-
-    @Test
-    public void testRenameColumn() {
-        sql(
-                "CREATE TABLE t5 ("
-                        + "  order_key bigint,"
-                        + "  order_status varchar,"
-                        + "  total_price double,"
-                        + "  order_date date"
-                        + ")"
-                        + "WITH ("
-                        + "file_format = 'ORC',"
-                        + "primary_key = ARRAY['order_key','order_date'],"
-                        + "partitioned_by = ARRAY['order_date'],"
-                        + "bucket = '2',"
-                        + "bucket_key = 'order_key',"
-                        + "changelog_producer = 'input'"
-                        + ")");
-        sql("ALTER TABLE paimon.default.t5 RENAME COLUMN order_status to g");
-        assertThat(sql("SHOW COLUMNS FROM paimon.default.t5"))
-                .isEqualTo(
-                        "[[order_key, bigint, , ], [g, varchar(2147483646), , 
], [total_price, double, , ], [order_date, date, , ]]");
-        sql("DROP TABLE IF EXISTS paimon.default.t5");
-    }
-
-    @Test
-    public void testDropColumn() {
-        sql(
-                "CREATE TABLE t5 ("
-                        + "  order_key bigint,"
-                        + "  order_status varchar,"
-                        + "  total_price double,"
-                        + "  order_date date"
-                        + ")"
-                        + "WITH ("
-                        + "file_format = 'ORC',"
-                        + "primary_key = ARRAY['order_key','order_date'],"
-                        + "partitioned_by = ARRAY['order_date'],"
-                        + "bucket = '2',"
-                        + "bucket_key = 'order_key',"
-                        + "changelog_producer = 'input'"
-                        + ")");
-        sql("ALTER TABLE paimon.default.t5 DROP COLUMN order_status");
-        assertThat(sql("SHOW COLUMNS FROM paimon.default.t5"))
-                .isEqualTo(
-                        "[[order_key, bigint, , ], [total_price, double, , ], 
[order_date, date, , ]]");
-        sql("DROP TABLE IF EXISTS paimon.default.t5");
-    }
-
-    @Test
-    public void testSetTableProperties() {
-        sql(
-                "CREATE TABLE t5 ("
-                        + "  order_key bigint,"
-                        + "  order_status varchar,"
-                        + "  total_price double,"
-                        + "  order_date date"
-                        + ")"
-                        + "WITH ("
-                        + "file_format = 'ORC',"
-                        + "primary_key = ARRAY['order_key','order_date'],"
-                        + "partitioned_by = ARRAY['order_date'],"
-                        + "bucket = '2',"
-                        + "bucket_key = 'order_key',"
-                        + "changelog_producer = 'input'"
-                        + ")");
-        sql(
-                "ALTER TABLE paimon.default.t5 SET PROPERTIES bucket = 
'4',snapshot_time_retained = '4h'");
-        sql("DROP TABLE IF EXISTS paimon.default.t5");
-    }
-
-    private String sql(String sql) {
-        MaterializedResult result = getQueryRunner().execute(sql);
-        return result.getMaterializedRows().toString();
-    }
-}
diff --git a/paimon-trino-370/pom.xml b/paimon-trino-370/pom.xml
index a7c8cf1..ba614e2 100644
--- a/paimon-trino-370/pom.xml
+++ b/paimon-trino-370/pom.xml
@@ -73,6 +73,15 @@ under the License.
         </dependency>
 
         <!-- for testing -->
+
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>paimon-trino-common</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+            <type>test-jar</type>
+        </dependency>
+
         <dependency>
             <groupId>io.trino</groupId>
             <artifactId>trino-client</artifactId>
diff --git 
a/paimon-trino-370/src/test/java/org/apache/paimon/trino/SimpleTableTestHelper.java
 
b/paimon-trino-370/src/test/java/org/apache/paimon/trino/SimpleTableTestHelper.java
deleted file mode 100644
index 621cfbc..0000000
--- 
a/paimon-trino-370/src/test/java/org/apache/paimon/trino/SimpleTableTestHelper.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.trino;
-
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.fs.Path;
-import org.apache.paimon.fs.local.LocalFileIO;
-import org.apache.paimon.schema.Schema;
-import org.apache.paimon.schema.SchemaManager;
-import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.FileStoreTableFactory;
-import org.apache.paimon.table.sink.InnerTableCommit;
-import org.apache.paimon.table.sink.InnerTableWrite;
-import org.apache.paimon.types.RowType;
-
-import java.util.Collections;
-import java.util.HashMap;
-
-/** A simple table test helper to write and commit. */
-public class SimpleTableTestHelper {
-
-    private final InnerTableWrite writer;
-    private final InnerTableCommit commit;
-
-    public SimpleTableTestHelper(Path path, RowType rowType) throws Exception {
-        new SchemaManager(LocalFileIO.create(), path)
-                .createTable(
-                        new Schema(
-                                rowType.getFields(),
-                                Collections.emptyList(),
-                                Collections.emptyList(),
-                                new HashMap<>() {
-                                    {
-                                        put("write-mode", "change-log");
-                                    }
-                                },
-                                ""));
-        FileStoreTable table = 
FileStoreTableFactory.create(LocalFileIO.create(), path);
-        String user = "user";
-        this.writer = table.newWrite(user);
-        this.commit = table.newCommit(user);
-    }
-
-    public void write(InternalRow row) throws Exception {
-        writer.write(row);
-    }
-
-    public void commit() throws Exception {
-        commit.commit(0, writer.prepareCommit(true, 0));
-    }
-}
diff --git 
a/paimon-trino-370/src/test/java/org/apache/paimon/trino/TestTrino370ITCase.java
 
b/paimon-trino-370/src/test/java/org/apache/paimon/trino/TestTrino370ITCase.java
new file mode 100644
index 0000000..98e8c40
--- /dev/null
+++ 
b/paimon-trino-370/src/test/java/org/apache/paimon/trino/TestTrino370ITCase.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.trino;
+
+/** {@link TestTrinoITCase} for Trino 370. */
+public class TestTrino370ITCase extends TestTrinoITCase {}
diff --git 
a/paimon-trino-370/src/test/java/org/apache/paimon/trino/TestTrinoITCase.java 
b/paimon-trino-370/src/test/java/org/apache/paimon/trino/TestTrinoITCase.java
deleted file mode 100644
index c3e94b7..0000000
--- 
a/paimon-trino-370/src/test/java/org/apache/paimon/trino/TestTrinoITCase.java
+++ /dev/null
@@ -1,406 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.trino;
-
-import org.apache.paimon.data.GenericMap;
-import org.apache.paimon.data.GenericRow;
-import org.apache.paimon.fs.Path;
-import org.apache.paimon.fs.local.LocalFileIO;
-import org.apache.paimon.schema.Schema;
-import org.apache.paimon.schema.SchemaManager;
-import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.FileStoreTableFactory;
-import org.apache.paimon.table.sink.InnerTableCommit;
-import org.apache.paimon.table.sink.InnerTableWrite;
-import org.apache.paimon.types.BigIntType;
-import org.apache.paimon.types.CharType;
-import org.apache.paimon.types.DataField;
-import org.apache.paimon.types.DataTypes;
-import org.apache.paimon.types.IntType;
-import org.apache.paimon.types.MapType;
-import org.apache.paimon.types.RowKind;
-import org.apache.paimon.types.RowType;
-import org.apache.paimon.types.VarCharType;
-
-import io.trino.testing.AbstractTestQueryFramework;
-import io.trino.testing.DistributedQueryRunner;
-import io.trino.testing.MaterializedResult;
-import io.trino.testing.QueryRunner;
-import org.testng.annotations.Test;
-
-import java.nio.file.Files;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
-
-import static io.airlift.testing.Closeables.closeAllSuppress;
-import static io.trino.testing.TestingSession.testSessionBuilder;
-import static org.apache.paimon.data.BinaryString.fromString;
-import static org.assertj.core.api.Assertions.assertThat;
-
-/** ITCase for trino connector. */
-public class TestTrinoITCase extends AbstractTestQueryFramework {
-
-    private static final String CATALOG = "paimon";
-    private static final String DB = "default";
-
-    @Override
-    protected QueryRunner createQueryRunner() throws Exception {
-        String warehouse =
-                
Files.createTempDirectory(UUID.randomUUID().toString()).toUri().toString();
-        // flink sink
-        Path tablePath1 = new Path(warehouse, DB + ".db/t1");
-        SimpleTableTestHelper testHelper1 = createTestHelper(tablePath1);
-        testHelper1.write(GenericRow.of(1, 2L, fromString("1"), 
fromString("1")));
-        testHelper1.write(GenericRow.of(3, 4L, fromString("2"), 
fromString("2")));
-        testHelper1.write(GenericRow.of(5, 6L, fromString("3"), 
fromString("3")));
-        testHelper1.write(
-                GenericRow.ofKind(RowKind.DELETE, 3, 4L, fromString("2"), 
fromString("2")));
-        testHelper1.commit();
-
-        Path tablePath2 = new Path(warehouse, "default.db/t2");
-        SimpleTableTestHelper testHelper2 = createTestHelper(tablePath2);
-        testHelper2.write(GenericRow.of(1, 2L, fromString("1"), 
fromString("1")));
-        testHelper2.write(GenericRow.of(3, 4L, fromString("2"), 
fromString("2")));
-        testHelper2.commit();
-        testHelper2.write(GenericRow.of(5, 6L, fromString("3"), 
fromString("3")));
-        testHelper2.write(GenericRow.of(7, 8L, fromString("4"), 
fromString("4")));
-        testHelper2.commit();
-
-        {
-            Path tablePath3 = new Path(warehouse, "default.db/t3");
-            RowType rowType =
-                    new RowType(
-                            Arrays.asList(
-                                    new DataField(0, "pt", DataTypes.STRING()),
-                                    new DataField(1, "a", new IntType()),
-                                    new DataField(2, "b", new BigIntType()),
-                                    new DataField(3, "c", new BigIntType()),
-                                    new DataField(4, "d", new IntType())));
-            new SchemaManager(LocalFileIO.create(), tablePath3)
-                    .createTable(
-                            new Schema(
-                                    rowType.getFields(),
-                                    Collections.singletonList("pt"),
-                                    Collections.emptyList(),
-                                    new HashMap<>(),
-                                    ""));
-            FileStoreTable table = 
FileStoreTableFactory.create(LocalFileIO.create(), tablePath3);
-            InnerTableWrite writer = table.newWrite("user");
-            InnerTableCommit commit = table.newCommit("user");
-            writer.write(GenericRow.of(fromString("1"), 1, 1L, 1L, 1));
-            writer.write(GenericRow.of(fromString("1"), 1, 2L, 2L, 2));
-            writer.write(GenericRow.of(fromString("2"), 3, 3L, 3L, 3));
-            commit.commit(0, writer.prepareCommit(true, 0));
-        }
-
-        {
-            Path tablePath4 = new Path(warehouse, "default.db/t4");
-            RowType rowType =
-                    new RowType(
-                            Arrays.asList(
-                                    new DataField(0, "i", new IntType()),
-                                    new DataField(
-                                            1,
-                                            "map",
-                                            new MapType(
-                                                    new 
VarCharType(VarCharType.MAX_LENGTH),
-                                                    new 
VarCharType(VarCharType.MAX_LENGTH)))));
-            new SchemaManager(LocalFileIO.create(), tablePath4)
-                    .createTable(
-                            new Schema(
-                                    rowType.getFields(),
-                                    Collections.emptyList(),
-                                    Collections.singletonList("i"),
-                                    new HashMap<>(),
-                                    ""));
-            FileStoreTable table = 
FileStoreTableFactory.create(LocalFileIO.create(), tablePath4);
-            InnerTableWrite writer = table.newWrite("user");
-            InnerTableCommit commit = table.newCommit("user");
-            writer.write(
-                    GenericRow.of(
-                            1,
-                            new GenericMap(
-                                    new HashMap<>() {
-                                        {
-                                            put(fromString("1"), 
fromString("2"));
-                                        }
-                                    })));
-            commit.commit(0, writer.prepareCommit(true, 0));
-        }
-
-        DistributedQueryRunner queryRunner = null;
-        try {
-            queryRunner =
-                    DistributedQueryRunner.builder(
-                                    
testSessionBuilder().setCatalog(CATALOG).setSchema(DB).build())
-                            .build();
-            queryRunner.installPlugin(new TrinoPlugin());
-            Map<String, String> options = new HashMap<>();
-            options.put("warehouse", warehouse);
-            queryRunner.createCatalog(CATALOG, CATALOG, options);
-            return queryRunner;
-        } catch (Throwable e) {
-            closeAllSuppress(e, queryRunner);
-            throw e;
-        }
-    }
-
-    private static SimpleTableTestHelper createTestHelper(Path tablePath) 
throws Exception {
-        RowType rowType =
-                new RowType(
-                        Arrays.asList(
-                                new DataField(0, "a", new IntType()),
-                                new DataField(1, "b", new BigIntType()),
-                                // test field name has upper case
-                                new DataField(2, "aCa", new VarCharType()),
-                                new DataField(3, "d", new CharType(1))));
-        return new SimpleTableTestHelper(tablePath, rowType);
-    }
-
-    @Test
-    public void testComplexTypes() {
-        assertThat(sql("SELECT * FROM paimon.default.t4")).isEqualTo("[[1, 
{1=2}]]");
-    }
-
-    @Test
-    public void testProjection() {
-        assertThat(sql("SELECT * FROM paimon.default.t1"))
-                .isEqualTo("[[1, 2, 1, 1], [5, 6, 3, 3]]");
-        assertThat(sql("SELECT a, aCa FROM 
paimon.default.t1")).isEqualTo("[[1, 1], [5, 3]]");
-        assertThat(sql("SELECT SUM(b) FROM 
paimon.default.t1")).isEqualTo("[[8]]");
-    }
-
-    @Test
-    public void testSystemTable() {
-        assertThat(
-                        sql(
-                                "SELECT 
snapshot_id,schema_id,commit_user,commit_identifier,commit_kind FROM 
\"t1$snapshots\""))
-                .isEqualTo("[[1, 0, user, 0, APPEND]]");
-    }
-
-    @Test
-    public void testFilter() {
-        assertThat(sql("SELECT a, aCa FROM paimon.default.t2 WHERE a < 4"))
-                .isEqualTo("[[1, 1], [3, 2]]");
-    }
-
-    @Test
-    public void testGroupByWithCast() {
-        assertThat(
-                        sql(
-                                "SELECT pt, a, SUM(b), SUM(d) FROM 
paimon.default.t3 GROUP BY pt, a ORDER BY pt, a"))
-                .isEqualTo("[[1, 1, 3, 3], [2, 3, 3, 3]]");
-    }
-
-    @Test
-    public void testShowCreateTable() {
-        assertThat(sql("SHOW CREATE TABLE paimon.default.t3"))
-                .isEqualTo(
-                        "[[CREATE TABLE paimon.default.t3 (\n"
-                                + "   pt varchar,\n"
-                                + "   a integer,\n"
-                                + "   b bigint,\n"
-                                + "   c bigint,\n"
-                                + "   d integer\n"
-                                + ")]]");
-    }
-
-    @Test
-    public void testCreateSchema() {
-        sql("CREATE SCHEMA paimon.test");
-        assertThat(sql("SHOW SCHEMAS FROM paimon"))
-                .isEqualTo("[[default], [information_schema], [test]]");
-        sql("DROP SCHEMA paimon.test");
-    }
-
-    @Test
-    public void testDropSchema() {
-        sql("CREATE SCHEMA paimon.tpch");
-        sql("DROP SCHEMA paimon.tpch");
-        assertThat(sql("SHOW SCHEMAS FROM paimon")).isEqualTo("[[default], 
[information_schema]]");
-    }
-
-    @Test
-    public void testCreateTable() {
-        sql(
-                "CREATE TABLE orders ("
-                        + "  order_key bigint,"
-                        + "  order_status varchar,"
-                        + "  total_price double,"
-                        + "  order_date date"
-                        + ")"
-                        + "WITH ("
-                        + "file_format = 'ORC',"
-                        + "primary_key = ARRAY['order_key','order_date'],"
-                        + "partitioned_by = ARRAY['order_date'],"
-                        + "bucket = '2',"
-                        + "bucket_key = 'order_key',"
-                        + "changelog_producer = 'input'"
-                        + ")");
-        assertThat(sql("SHOW TABLES FROM paimon.default"))
-                .isEqualTo("[[orders], [t1], [t2], [t3], [t4]]");
-        sql("DROP TABLE IF EXISTS paimon.default.orders");
-    }
-
-    @Test
-    public void testRenameTable() {
-        sql(
-                "CREATE TABLE t5 ("
-                        + "  order_key bigint,"
-                        + "  order_status varchar,"
-                        + "  total_price double,"
-                        + "  order_date date"
-                        + ")"
-                        + "WITH ("
-                        + "file_format = 'ORC',"
-                        + "primary_key = ARRAY['order_key','order_date'],"
-                        + "partitioned_by = ARRAY['order_date'],"
-                        + "bucket = '2',"
-                        + "bucket_key = 'order_key',"
-                        + "changelog_producer = 'input'"
-                        + ")");
-        sql("ALTER TABLE paimon.default.t5 RENAME TO t6");
-        assertThat(sql("SHOW TABLES FROM paimon.default"))
-                .isEqualTo("[[t1], [t2], [t3], [t4], [t6]]");
-        sql("DROP TABLE IF EXISTS paimon.default.t6");
-    }
-
-    @Test
-    public void testDropTable() {
-        sql(
-                "CREATE TABLE t5 ("
-                        + "  order_key bigint,"
-                        + "  order_status varchar,"
-                        + "  total_price double,"
-                        + "  order_date date"
-                        + ")"
-                        + "WITH ("
-                        + "file_format = 'ORC',"
-                        + "primary_key = ARRAY['order_key','order_date'],"
-                        + "partitioned_by = ARRAY['order_date'],"
-                        + "bucket = '2',"
-                        + "bucket_key = 'order_key',"
-                        + "changelog_producer = 'input'"
-                        + ")");
-        sql("DROP TABLE IF EXISTS paimon.default.t5");
-        assertThat(sql("SHOW TABLES FROM paimon.default")).isEqualTo("[[t1], 
[t2], [t3], [t4]]");
-    }
-
-    @Test
-    public void testAddColumn() {
-        sql(
-                "CREATE TABLE t5 ("
-                        + "  order_key bigint,"
-                        + "  order_status varchar,"
-                        + "  total_price double,"
-                        + "  order_date date"
-                        + ")"
-                        + "WITH ("
-                        + "file_format = 'ORC',"
-                        + "primary_key = ARRAY['order_key','order_date'],"
-                        + "partitioned_by = ARRAY['order_date'],"
-                        + "bucket = '2',"
-                        + "bucket_key = 'order_key',"
-                        + "changelog_producer = 'input'"
-                        + ")");
-        sql("ALTER TABLE paimon.default.t5 ADD COLUMN zip varchar");
-        assertThat(sql("SHOW COLUMNS FROM paimon.default.t5"))
-                .isEqualTo(
-                        "[[order_key, bigint, , ], [order_status, 
varchar(2147483646), , ], [total_price, double, , ], [order_date, date, , ], 
[zip, varchar(2147483646), , ]]");
-        sql("DROP TABLE IF EXISTS paimon.default.t5");
-    }
-
-    @Test
-    public void testRenameColumn() {
-        sql(
-                "CREATE TABLE t5 ("
-                        + "  order_key bigint,"
-                        + "  order_status varchar,"
-                        + "  total_price double,"
-                        + "  order_date date"
-                        + ")"
-                        + "WITH ("
-                        + "file_format = 'ORC',"
-                        + "primary_key = ARRAY['order_key','order_date'],"
-                        + "partitioned_by = ARRAY['order_date'],"
-                        + "bucket = '2',"
-                        + "bucket_key = 'order_key',"
-                        + "changelog_producer = 'input'"
-                        + ")");
-        sql("ALTER TABLE paimon.default.t5 RENAME COLUMN order_status to g");
-        assertThat(sql("SHOW COLUMNS FROM paimon.default.t5"))
-                .isEqualTo(
-                        "[[order_key, bigint, , ], [g, varchar(2147483646), , 
], [total_price, double, , ], [order_date, date, , ]]");
-        sql("DROP TABLE IF EXISTS paimon.default.t5");
-    }
-
-    @Test
-    public void testDropColumn() {
-        sql(
-                "CREATE TABLE t5 ("
-                        + "  order_key bigint,"
-                        + "  order_status varchar,"
-                        + "  total_price double,"
-                        + "  order_date date"
-                        + ")"
-                        + "WITH ("
-                        + "file_format = 'ORC',"
-                        + "primary_key = ARRAY['order_key','order_date'],"
-                        + "partitioned_by = ARRAY['order_date'],"
-                        + "bucket = '2',"
-                        + "bucket_key = 'order_key',"
-                        + "changelog_producer = 'input'"
-                        + ")");
-        sql("ALTER TABLE paimon.default.t5 DROP COLUMN order_status");
-        assertThat(sql("SHOW COLUMNS FROM paimon.default.t5"))
-                .isEqualTo(
-                        "[[order_key, bigint, , ], [total_price, double, , ], 
[order_date, date, , ]]");
-        sql("DROP TABLE IF EXISTS paimon.default.t5");
-    }
-
-    @Test
-    public void testSetTableProperties() {
-        sql(
-                "CREATE TABLE t5 ("
-                        + "  order_key bigint,"
-                        + "  order_status varchar,"
-                        + "  total_price double,"
-                        + "  order_date date"
-                        + ")"
-                        + "WITH ("
-                        + "file_format = 'ORC',"
-                        + "primary_key = ARRAY['order_key','order_date'],"
-                        + "partitioned_by = ARRAY['order_date'],"
-                        + "bucket = '2',"
-                        + "bucket_key = 'order_key',"
-                        + "changelog_producer = 'input'"
-                        + ")");
-        sql(
-                "ALTER TABLE paimon.default.t5 SET PROPERTIES bucket = 
'4',snapshot_time_retained = '4h'");
-        sql("DROP TABLE IF EXISTS paimon.default.t5");
-    }
-
-    private String sql(String sql) {
-        MaterializedResult result = getQueryRunner().execute(sql);
-        return result.getMaterializedRows().toString();
-    }
-}
diff --git a/paimon-trino-388/pom.xml b/paimon-trino-388/pom.xml
index 309a099..b2b2d06 100644
--- a/paimon-trino-388/pom.xml
+++ b/paimon-trino-388/pom.xml
@@ -73,6 +73,15 @@ under the License.
         </dependency>
 
         <!-- for testing -->
+
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>paimon-trino-common</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+            <type>test-jar</type>
+        </dependency>
+
         <dependency>
             <groupId>io.trino</groupId>
             <artifactId>trino-client</artifactId>
diff --git 
a/paimon-trino-388/src/test/java/org/apache/paimon/trino/TestTrino388ITCase.java
 
b/paimon-trino-388/src/test/java/org/apache/paimon/trino/TestTrino388ITCase.java
new file mode 100644
index 0000000..95c10d0
--- /dev/null
+++ 
b/paimon-trino-388/src/test/java/org/apache/paimon/trino/TestTrino388ITCase.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.trino;
+
+/** {@link TestTrinoITCase} for Trino 388. */
+public class TestTrino388ITCase extends TestTrinoITCase {}
diff --git a/paimon-trino-388/src/test/resources/log4j2-test.properties 
b/paimon-trino-388/src/test/resources/log4j2-test.properties
new file mode 100644
index 0000000..1b3980d
--- /dev/null
+++ b/paimon-trino-388/src/test/resources/log4j2-test.properties
@@ -0,0 +1,28 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = OFF
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%tid %t] %-5p %c %x - %m%n
diff --git a/paimon-trino-393/pom.xml b/paimon-trino-393/pom.xml
index 991424e..8f0bd09 100644
--- a/paimon-trino-393/pom.xml
+++ b/paimon-trino-393/pom.xml
@@ -76,6 +76,15 @@ under the License.
         </dependency>
 
         <!-- for testing -->
+
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>paimon-trino-common</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+            <type>test-jar</type>
+        </dependency>
+
         <dependency>
             <groupId>io.trino</groupId>
             <artifactId>trino-client</artifactId>
diff --git 
a/paimon-trino-393/src/test/java/org/apache/paimon/trino/SimpleTableTestHelper.java
 
b/paimon-trino-393/src/test/java/org/apache/paimon/trino/SimpleTableTestHelper.java
deleted file mode 100644
index 621cfbc..0000000
--- 
a/paimon-trino-393/src/test/java/org/apache/paimon/trino/SimpleTableTestHelper.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.trino;
-
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.fs.Path;
-import org.apache.paimon.fs.local.LocalFileIO;
-import org.apache.paimon.schema.Schema;
-import org.apache.paimon.schema.SchemaManager;
-import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.FileStoreTableFactory;
-import org.apache.paimon.table.sink.InnerTableCommit;
-import org.apache.paimon.table.sink.InnerTableWrite;
-import org.apache.paimon.types.RowType;
-
-import java.util.Collections;
-import java.util.HashMap;
-
-/** A simple table test helper to write and commit. */
-public class SimpleTableTestHelper {
-
-    private final InnerTableWrite writer;
-    private final InnerTableCommit commit;
-
-    public SimpleTableTestHelper(Path path, RowType rowType) throws Exception {
-        new SchemaManager(LocalFileIO.create(), path)
-                .createTable(
-                        new Schema(
-                                rowType.getFields(),
-                                Collections.emptyList(),
-                                Collections.emptyList(),
-                                new HashMap<>() {
-                                    {
-                                        put("write-mode", "change-log");
-                                    }
-                                },
-                                ""));
-        FileStoreTable table = 
FileStoreTableFactory.create(LocalFileIO.create(), path);
-        String user = "user";
-        this.writer = table.newWrite(user);
-        this.commit = table.newCommit(user);
-    }
-
-    public void write(InternalRow row) throws Exception {
-        writer.write(row);
-    }
-
-    public void commit() throws Exception {
-        commit.commit(0, writer.prepareCommit(true, 0));
-    }
-}
diff --git 
a/paimon-trino-393/src/test/java/org/apache/paimon/trino/TestTrino393ITCase.java
 
b/paimon-trino-393/src/test/java/org/apache/paimon/trino/TestTrino393ITCase.java
new file mode 100644
index 0000000..beaaede
--- /dev/null
+++ 
b/paimon-trino-393/src/test/java/org/apache/paimon/trino/TestTrino393ITCase.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.trino;
+
+/** {@link TestTrinoITCase} for Trino 393. */
+public class TestTrino393ITCase extends TestTrinoITCase {}
diff --git 
a/paimon-trino-393/src/test/java/org/apache/paimon/trino/TestTrinoITCase.java 
b/paimon-trino-393/src/test/java/org/apache/paimon/trino/TestTrinoITCase.java
deleted file mode 100644
index c3e94b7..0000000
--- 
a/paimon-trino-393/src/test/java/org/apache/paimon/trino/TestTrinoITCase.java
+++ /dev/null
@@ -1,406 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.trino;
-
-import org.apache.paimon.data.GenericMap;
-import org.apache.paimon.data.GenericRow;
-import org.apache.paimon.fs.Path;
-import org.apache.paimon.fs.local.LocalFileIO;
-import org.apache.paimon.schema.Schema;
-import org.apache.paimon.schema.SchemaManager;
-import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.FileStoreTableFactory;
-import org.apache.paimon.table.sink.InnerTableCommit;
-import org.apache.paimon.table.sink.InnerTableWrite;
-import org.apache.paimon.types.BigIntType;
-import org.apache.paimon.types.CharType;
-import org.apache.paimon.types.DataField;
-import org.apache.paimon.types.DataTypes;
-import org.apache.paimon.types.IntType;
-import org.apache.paimon.types.MapType;
-import org.apache.paimon.types.RowKind;
-import org.apache.paimon.types.RowType;
-import org.apache.paimon.types.VarCharType;
-
-import io.trino.testing.AbstractTestQueryFramework;
-import io.trino.testing.DistributedQueryRunner;
-import io.trino.testing.MaterializedResult;
-import io.trino.testing.QueryRunner;
-import org.testng.annotations.Test;
-
-import java.nio.file.Files;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
-
-import static io.airlift.testing.Closeables.closeAllSuppress;
-import static io.trino.testing.TestingSession.testSessionBuilder;
-import static org.apache.paimon.data.BinaryString.fromString;
-import static org.assertj.core.api.Assertions.assertThat;
-
-/** ITCase for trino connector. */
-public class TestTrinoITCase extends AbstractTestQueryFramework {
-
-    private static final String CATALOG = "paimon";
-    private static final String DB = "default";
-
-    @Override
-    protected QueryRunner createQueryRunner() throws Exception {
-        String warehouse =
-                
Files.createTempDirectory(UUID.randomUUID().toString()).toUri().toString();
-        // flink sink
-        Path tablePath1 = new Path(warehouse, DB + ".db/t1");
-        SimpleTableTestHelper testHelper1 = createTestHelper(tablePath1);
-        testHelper1.write(GenericRow.of(1, 2L, fromString("1"), 
fromString("1")));
-        testHelper1.write(GenericRow.of(3, 4L, fromString("2"), 
fromString("2")));
-        testHelper1.write(GenericRow.of(5, 6L, fromString("3"), 
fromString("3")));
-        testHelper1.write(
-                GenericRow.ofKind(RowKind.DELETE, 3, 4L, fromString("2"), 
fromString("2")));
-        testHelper1.commit();
-
-        Path tablePath2 = new Path(warehouse, "default.db/t2");
-        SimpleTableTestHelper testHelper2 = createTestHelper(tablePath2);
-        testHelper2.write(GenericRow.of(1, 2L, fromString("1"), 
fromString("1")));
-        testHelper2.write(GenericRow.of(3, 4L, fromString("2"), 
fromString("2")));
-        testHelper2.commit();
-        testHelper2.write(GenericRow.of(5, 6L, fromString("3"), 
fromString("3")));
-        testHelper2.write(GenericRow.of(7, 8L, fromString("4"), 
fromString("4")));
-        testHelper2.commit();
-
-        {
-            Path tablePath3 = new Path(warehouse, "default.db/t3");
-            RowType rowType =
-                    new RowType(
-                            Arrays.asList(
-                                    new DataField(0, "pt", DataTypes.STRING()),
-                                    new DataField(1, "a", new IntType()),
-                                    new DataField(2, "b", new BigIntType()),
-                                    new DataField(3, "c", new BigIntType()),
-                                    new DataField(4, "d", new IntType())));
-            new SchemaManager(LocalFileIO.create(), tablePath3)
-                    .createTable(
-                            new Schema(
-                                    rowType.getFields(),
-                                    Collections.singletonList("pt"),
-                                    Collections.emptyList(),
-                                    new HashMap<>(),
-                                    ""));
-            FileStoreTable table = 
FileStoreTableFactory.create(LocalFileIO.create(), tablePath3);
-            InnerTableWrite writer = table.newWrite("user");
-            InnerTableCommit commit = table.newCommit("user");
-            writer.write(GenericRow.of(fromString("1"), 1, 1L, 1L, 1));
-            writer.write(GenericRow.of(fromString("1"), 1, 2L, 2L, 2));
-            writer.write(GenericRow.of(fromString("2"), 3, 3L, 3L, 3));
-            commit.commit(0, writer.prepareCommit(true, 0));
-        }
-
-        {
-            Path tablePath4 = new Path(warehouse, "default.db/t4");
-            RowType rowType =
-                    new RowType(
-                            Arrays.asList(
-                                    new DataField(0, "i", new IntType()),
-                                    new DataField(
-                                            1,
-                                            "map",
-                                            new MapType(
-                                                    new 
VarCharType(VarCharType.MAX_LENGTH),
-                                                    new 
VarCharType(VarCharType.MAX_LENGTH)))));
-            new SchemaManager(LocalFileIO.create(), tablePath4)
-                    .createTable(
-                            new Schema(
-                                    rowType.getFields(),
-                                    Collections.emptyList(),
-                                    Collections.singletonList("i"),
-                                    new HashMap<>(),
-                                    ""));
-            FileStoreTable table = 
FileStoreTableFactory.create(LocalFileIO.create(), tablePath4);
-            InnerTableWrite writer = table.newWrite("user");
-            InnerTableCommit commit = table.newCommit("user");
-            writer.write(
-                    GenericRow.of(
-                            1,
-                            new GenericMap(
-                                    new HashMap<>() {
-                                        {
-                                            put(fromString("1"), 
fromString("2"));
-                                        }
-                                    })));
-            commit.commit(0, writer.prepareCommit(true, 0));
-        }
-
-        DistributedQueryRunner queryRunner = null;
-        try {
-            queryRunner =
-                    DistributedQueryRunner.builder(
-                                    
testSessionBuilder().setCatalog(CATALOG).setSchema(DB).build())
-                            .build();
-            queryRunner.installPlugin(new TrinoPlugin());
-            Map<String, String> options = new HashMap<>();
-            options.put("warehouse", warehouse);
-            queryRunner.createCatalog(CATALOG, CATALOG, options);
-            return queryRunner;
-        } catch (Throwable e) {
-            closeAllSuppress(e, queryRunner);
-            throw e;
-        }
-    }
-
-    private static SimpleTableTestHelper createTestHelper(Path tablePath) 
throws Exception {
-        RowType rowType =
-                new RowType(
-                        Arrays.asList(
-                                new DataField(0, "a", new IntType()),
-                                new DataField(1, "b", new BigIntType()),
-                                // test field name has upper case
-                                new DataField(2, "aCa", new VarCharType()),
-                                new DataField(3, "d", new CharType(1))));
-        return new SimpleTableTestHelper(tablePath, rowType);
-    }
-
-    @Test
-    public void testComplexTypes() {
-        assertThat(sql("SELECT * FROM paimon.default.t4")).isEqualTo("[[1, 
{1=2}]]");
-    }
-
-    @Test
-    public void testProjection() {
-        assertThat(sql("SELECT * FROM paimon.default.t1"))
-                .isEqualTo("[[1, 2, 1, 1], [5, 6, 3, 3]]");
-        assertThat(sql("SELECT a, aCa FROM 
paimon.default.t1")).isEqualTo("[[1, 1], [5, 3]]");
-        assertThat(sql("SELECT SUM(b) FROM 
paimon.default.t1")).isEqualTo("[[8]]");
-    }
-
-    @Test
-    public void testSystemTable() {
-        assertThat(
-                        sql(
-                                "SELECT 
snapshot_id,schema_id,commit_user,commit_identifier,commit_kind FROM 
\"t1$snapshots\""))
-                .isEqualTo("[[1, 0, user, 0, APPEND]]");
-    }
-
-    @Test
-    public void testFilter() {
-        assertThat(sql("SELECT a, aCa FROM paimon.default.t2 WHERE a < 4"))
-                .isEqualTo("[[1, 1], [3, 2]]");
-    }
-
-    @Test
-    public void testGroupByWithCast() {
-        assertThat(
-                        sql(
-                                "SELECT pt, a, SUM(b), SUM(d) FROM 
paimon.default.t3 GROUP BY pt, a ORDER BY pt, a"))
-                .isEqualTo("[[1, 1, 3, 3], [2, 3, 3, 3]]");
-    }
-
-    @Test
-    public void testShowCreateTable() {
-        assertThat(sql("SHOW CREATE TABLE paimon.default.t3"))
-                .isEqualTo(
-                        "[[CREATE TABLE paimon.default.t3 (\n"
-                                + "   pt varchar,\n"
-                                + "   a integer,\n"
-                                + "   b bigint,\n"
-                                + "   c bigint,\n"
-                                + "   d integer\n"
-                                + ")]]");
-    }
-
-    @Test
-    public void testCreateSchema() {
-        sql("CREATE SCHEMA paimon.test");
-        assertThat(sql("SHOW SCHEMAS FROM paimon"))
-                .isEqualTo("[[default], [information_schema], [test]]");
-        sql("DROP SCHEMA paimon.test");
-    }
-
-    @Test
-    public void testDropSchema() {
-        sql("CREATE SCHEMA paimon.tpch");
-        sql("DROP SCHEMA paimon.tpch");
-        assertThat(sql("SHOW SCHEMAS FROM paimon")).isEqualTo("[[default], 
[information_schema]]");
-    }
-
-    @Test
-    public void testCreateTable() {
-        sql(
-                "CREATE TABLE orders ("
-                        + "  order_key bigint,"
-                        + "  order_status varchar,"
-                        + "  total_price double,"
-                        + "  order_date date"
-                        + ")"
-                        + "WITH ("
-                        + "file_format = 'ORC',"
-                        + "primary_key = ARRAY['order_key','order_date'],"
-                        + "partitioned_by = ARRAY['order_date'],"
-                        + "bucket = '2',"
-                        + "bucket_key = 'order_key',"
-                        + "changelog_producer = 'input'"
-                        + ")");
-        assertThat(sql("SHOW TABLES FROM paimon.default"))
-                .isEqualTo("[[orders], [t1], [t2], [t3], [t4]]");
-        sql("DROP TABLE IF EXISTS paimon.default.orders");
-    }
-
-    @Test
-    public void testRenameTable() {
-        sql(
-                "CREATE TABLE t5 ("
-                        + "  order_key bigint,"
-                        + "  order_status varchar,"
-                        + "  total_price double,"
-                        + "  order_date date"
-                        + ")"
-                        + "WITH ("
-                        + "file_format = 'ORC',"
-                        + "primary_key = ARRAY['order_key','order_date'],"
-                        + "partitioned_by = ARRAY['order_date'],"
-                        + "bucket = '2',"
-                        + "bucket_key = 'order_key',"
-                        + "changelog_producer = 'input'"
-                        + ")");
-        sql("ALTER TABLE paimon.default.t5 RENAME TO t6");
-        assertThat(sql("SHOW TABLES FROM paimon.default"))
-                .isEqualTo("[[t1], [t2], [t3], [t4], [t6]]");
-        sql("DROP TABLE IF EXISTS paimon.default.t6");
-    }
-
-    @Test
-    public void testDropTable() {
-        sql(
-                "CREATE TABLE t5 ("
-                        + "  order_key bigint,"
-                        + "  order_status varchar,"
-                        + "  total_price double,"
-                        + "  order_date date"
-                        + ")"
-                        + "WITH ("
-                        + "file_format = 'ORC',"
-                        + "primary_key = ARRAY['order_key','order_date'],"
-                        + "partitioned_by = ARRAY['order_date'],"
-                        + "bucket = '2',"
-                        + "bucket_key = 'order_key',"
-                        + "changelog_producer = 'input'"
-                        + ")");
-        sql("DROP TABLE IF EXISTS paimon.default.t5");
-        assertThat(sql("SHOW TABLES FROM paimon.default")).isEqualTo("[[t1], 
[t2], [t3], [t4]]");
-    }
-
-    @Test
-    public void testAddColumn() {
-        sql(
-                "CREATE TABLE t5 ("
-                        + "  order_key bigint,"
-                        + "  order_status varchar,"
-                        + "  total_price double,"
-                        + "  order_date date"
-                        + ")"
-                        + "WITH ("
-                        + "file_format = 'ORC',"
-                        + "primary_key = ARRAY['order_key','order_date'],"
-                        + "partitioned_by = ARRAY['order_date'],"
-                        + "bucket = '2',"
-                        + "bucket_key = 'order_key',"
-                        + "changelog_producer = 'input'"
-                        + ")");
-        sql("ALTER TABLE paimon.default.t5 ADD COLUMN zip varchar");
-        assertThat(sql("SHOW COLUMNS FROM paimon.default.t5"))
-                .isEqualTo(
-                        "[[order_key, bigint, , ], [order_status, 
varchar(2147483646), , ], [total_price, double, , ], [order_date, date, , ], 
[zip, varchar(2147483646), , ]]");
-        sql("DROP TABLE IF EXISTS paimon.default.t5");
-    }
-
-    @Test
-    public void testRenameColumn() {
-        sql(
-                "CREATE TABLE t5 ("
-                        + "  order_key bigint,"
-                        + "  order_status varchar,"
-                        + "  total_price double,"
-                        + "  order_date date"
-                        + ")"
-                        + "WITH ("
-                        + "file_format = 'ORC',"
-                        + "primary_key = ARRAY['order_key','order_date'],"
-                        + "partitioned_by = ARRAY['order_date'],"
-                        + "bucket = '2',"
-                        + "bucket_key = 'order_key',"
-                        + "changelog_producer = 'input'"
-                        + ")");
-        sql("ALTER TABLE paimon.default.t5 RENAME COLUMN order_status to g");
-        assertThat(sql("SHOW COLUMNS FROM paimon.default.t5"))
-                .isEqualTo(
-                        "[[order_key, bigint, , ], [g, varchar(2147483646), , 
], [total_price, double, , ], [order_date, date, , ]]");
-        sql("DROP TABLE IF EXISTS paimon.default.t5");
-    }
-
-    @Test
-    public void testDropColumn() {
-        sql(
-                "CREATE TABLE t5 ("
-                        + "  order_key bigint,"
-                        + "  order_status varchar,"
-                        + "  total_price double,"
-                        + "  order_date date"
-                        + ")"
-                        + "WITH ("
-                        + "file_format = 'ORC',"
-                        + "primary_key = ARRAY['order_key','order_date'],"
-                        + "partitioned_by = ARRAY['order_date'],"
-                        + "bucket = '2',"
-                        + "bucket_key = 'order_key',"
-                        + "changelog_producer = 'input'"
-                        + ")");
-        sql("ALTER TABLE paimon.default.t5 DROP COLUMN order_status");
-        assertThat(sql("SHOW COLUMNS FROM paimon.default.t5"))
-                .isEqualTo(
-                        "[[order_key, bigint, , ], [total_price, double, , ], 
[order_date, date, , ]]");
-        sql("DROP TABLE IF EXISTS paimon.default.t5");
-    }
-
-    @Test
-    public void testSetTableProperties() {
-        sql(
-                "CREATE TABLE t5 ("
-                        + "  order_key bigint,"
-                        + "  order_status varchar,"
-                        + "  total_price double,"
-                        + "  order_date date"
-                        + ")"
-                        + "WITH ("
-                        + "file_format = 'ORC',"
-                        + "primary_key = ARRAY['order_key','order_date'],"
-                        + "partitioned_by = ARRAY['order_date'],"
-                        + "bucket = '2',"
-                        + "bucket_key = 'order_key',"
-                        + "changelog_producer = 'input'"
-                        + ")");
-        sql(
-                "ALTER TABLE paimon.default.t5 SET PROPERTIES bucket = 
'4',snapshot_time_retained = '4h'");
-        sql("DROP TABLE IF EXISTS paimon.default.t5");
-    }
-
-    private String sql(String sql) {
-        MaterializedResult result = getQueryRunner().execute(sql);
-        return result.getMaterializedRows().toString();
-    }
-}
diff --git a/paimon-trino-393/pom.xml b/paimon-trino-422/pom.xml
similarity index 94%
copy from paimon-trino-393/pom.xml
copy to paimon-trino-422/pom.xml
index 991424e..d5eb330 100644
--- a/paimon-trino-393/pom.xml
+++ b/paimon-trino-422/pom.xml
@@ -28,13 +28,13 @@ under the License.
         <version>0.5-SNAPSHOT</version>
     </parent>
 
-    <artifactId>paimon-trino-393</artifactId>
-    <name>Paimon : Trino : 393</name>
+    <artifactId>paimon-trino-422</artifactId>
+    <name>Paimon : Trino : 422</name>
 
     <properties>
         <target.java.version>17</target.java.version>
         <jdk.test.version>17</jdk.test.version>
-        <trino.version>393</trino.version>
+        <trino.version>422</trino.version>
         <hadoop.version>2.8.5</hadoop.version>
         <configuration.version>216</configuration.version>
         <slice.version>0.42</slice.version>
@@ -76,6 +76,15 @@ under the License.
         </dependency>
 
         <!-- for testing -->
+
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>paimon-trino-common</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+            <type>test-jar</type>
+        </dependency>
+
         <dependency>
             <groupId>io.trino</groupId>
             <artifactId>trino-client</artifactId>
diff --git 
a/paimon-trino-422/src/test/java/org/apache/paimon/trino/TestTrino422ITCase.java
 
b/paimon-trino-422/src/test/java/org/apache/paimon/trino/TestTrino422ITCase.java
new file mode 100644
index 0000000..4341754
--- /dev/null
+++ 
b/paimon-trino-422/src/test/java/org/apache/paimon/trino/TestTrino422ITCase.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.trino;
+
+/** {@link TestTrinoITCase} for Trino 422. */
+public class TestTrino422ITCase extends TestTrinoITCase {}
diff --git a/paimon-trino-422/src/test/resources/log4j2-test.properties 
b/paimon-trino-422/src/test/resources/log4j2-test.properties
new file mode 100644
index 0000000..1b3980d
--- /dev/null
+++ b/paimon-trino-422/src/test/resources/log4j2-test.properties
@@ -0,0 +1,28 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = OFF
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%tid %t] %-5p %c %x - %m%n
diff --git a/paimon-trino-common/pom.xml b/paimon-trino-common/pom.xml
index 166a249..dc11829 100644
--- a/paimon-trino-common/pom.xml
+++ b/paimon-trino-common/pom.xml
@@ -137,6 +137,18 @@ under the License.
 
     <build>
         <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-shade-plugin</artifactId>
diff --git 
a/paimon-trino-common/src/test/java/org/apache/paimon/trino/TestTrinoITCase.java
 
b/paimon-trino-common/src/test/java/org/apache/paimon/trino/TestTrinoITCase.java
index c3e94b7..29e6a14 100644
--- 
a/paimon-trino-common/src/test/java/org/apache/paimon/trino/TestTrinoITCase.java
+++ 
b/paimon-trino-common/src/test/java/org/apache/paimon/trino/TestTrinoITCase.java
@@ -57,7 +57,7 @@ import static org.apache.paimon.data.BinaryString.fromString;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** ITCase for trino connector. */
-public class TestTrinoITCase extends AbstractTestQueryFramework {
+public abstract class TestTrinoITCase extends AbstractTestQueryFramework {
 
     private static final String CATALOG = "paimon";
     private static final String DB = "default";
diff --git a/pom.xml b/pom.xml
index 91824c5..4df48ac 100644
--- a/pom.xml
+++ b/pom.xml
@@ -22,13 +22,14 @@ under the License.
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
     <modelVersion>4.0.0</modelVersion>
     <modules>
-        <module>paimon-trino-common</module>
-        <module>paimon-trino-388</module>
         <module>paimon-trino-358</module>
-        <module>paimon-trino-393</module>
         <module>paimon-trino-368</module>
         <module>paimon-trino-369</module>
         <module>paimon-trino-370</module>
+        <module>paimon-trino-388</module>
+        <module>paimon-trino-393</module>
+        <module>paimon-trino-422</module>
+        <module>paimon-trino-common</module>
     </modules>
 
     <parent>

Reply via email to