This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 3cac2bd126 [Feature][Connector-V2][JDBC] Add presto/trino dialect  
(#9388)
3cac2bd126 is described below

commit 3cac2bd126456b424c0c6a5045371f62c2c65b34
Author: dyp12 <[email protected]>
AuthorDate: Tue Jun 3 11:55:59 2025 +0800

    [Feature][Connector-V2][JDBC] Add presto/trino dialect  (#9388)
---
 docs/en/connector-v2/source/Jdbc.md                |   2 +
 seatunnel-connectors-v2/connector-jdbc/pom.xml     |  22 ++
 .../jdbc/internal/dialect/DatabaseIdentifier.java  |   1 +
 .../internal/dialect/presto/PrestoDialect.java     |  48 +++++
 .../dialect/presto/PrestoDialectFactory.java       |  44 ++++
 .../dialect/presto/PrestoJdbcRowConverter.java     |  47 +++++
 .../internal/dialect/presto/PrestoTypeMapper.java  | 116 +++++++++++
 seatunnel-dist/pom.xml                             |  14 ++
 .../src/main/assembly/assembly-bin-ci.xml          |   2 +
 .../connector-jdbc-e2e-part-7/pom.xml              |  11 +
 .../connectors/seatunnel/jdbc/JdbcPrestoIT.java    | 213 +++++++++++++++++++
 .../connectors/seatunnel/jdbc/JdbcTrinoIT.java     | 231 +++++++++++++++++++++
 .../resources/jdbc_presto_source_and_assert.conf   | 154 ++++++++++++++
 .../resources/jdbc_trino_source_and_assert.conf    | 154 ++++++++++++++
 .../container/seatunnel/SeaTunnelContainer.java    |   2 +-
 15 files changed, 1060 insertions(+), 1 deletion(-)

diff --git a/docs/en/connector-v2/source/Jdbc.md 
b/docs/en/connector-v2/source/Jdbc.md
index 3d2d6b995a..88637bd2cc 100644
--- a/docs/en/connector-v2/source/Jdbc.md
+++ b/docs/en/connector-v2/source/Jdbc.md
@@ -163,6 +163,8 @@ there are some reference value for params above.
 | InterSystems IRIS | com.intersystems.jdbc.IRISDriver                    | 
jdbc:IRIS://localhost:1972/%SYS                                        | 
https://raw.githubusercontent.com/intersystems-community/iris-driver-distribution/main/JDBC/JDK18/intersystems-jdbc-3.8.4.jar
 |
 | opengauss         | org.opengauss.Driver                                | 
jdbc:opengauss://localhost:5432/postgres                               | 
https://repo1.maven.org/maven2/org/opengauss/opengauss-jdbc/5.1.0-og/opengauss-jdbc-5.1.0-og.jar
                              |
 | Highgo            | com.highgo.jdbc.Driver                              | 
jdbc:highgo://localhost:5866/highgo                                    | 
https://repo1.maven.org/maven2/com/highgo/HgdbJdbc/6.2.3/HgdbJdbc-6.2.3.jar     
                                              |
+| Presto            | com.facebook.presto.jdbc.PrestoDriver               | 
jdbc:presto://localhost:8080/presto                                    | 
https://repo1.maven.org/maven2/com/facebook/presto/presto-jdbc/0.279/presto-jdbc-0.279.jar
                                    |
+| Trino             | io.trino.jdbc.TrinoDriver                           | 
jdbc:trino://localhost:8080/trino                                      | 
https://repo1.maven.org/maven2/io/trino/trino-jdbc/460/trino-jdbc-460.jar       
                                              |
 
 ## Example
 
diff --git a/seatunnel-connectors-v2/connector-jdbc/pom.xml 
b/seatunnel-connectors-v2/connector-jdbc/pom.xml
index 47fd43e6d8..87705fe01a 100644
--- a/seatunnel-connectors-v2/connector-jdbc/pom.xml
+++ b/seatunnel-connectors-v2/connector-jdbc/pom.xml
@@ -56,6 +56,8 @@
         <opengauss.jdbc.version>5.1.0-og</opengauss.jdbc.version>
         <mariadb.jdbc.version>3.5.1</mariadb.jdbc.version>
         <highgo.version>6.2.3</highgo.version>
+        <presto.version>0.279</presto.version>
+        <trino.version>460</trino.version>
     </properties>
 
     <dependencyManagement>
@@ -229,6 +231,18 @@
                 <version>${highgo.version}</version>
                 <scope>provided</scope>
             </dependency>
+            <dependency>
+                <groupId>com.facebook.presto</groupId>
+                <artifactId>presto-jdbc</artifactId>
+                <version>${presto.version}</version>
+                <scope>provided</scope>
+            </dependency>
+            <dependency>
+                <groupId>io.trino</groupId>
+                <artifactId>trino-jdbc</artifactId>
+                <version>${trino.version}</version>
+                <scope>provided</scope>
+            </dependency>
         </dependencies>
     </dependencyManagement>
 
@@ -352,6 +366,14 @@
             <groupId>com.highgo</groupId>
             <artifactId>HgdbJdbc</artifactId>
         </dependency>
+        <dependency>
+            <groupId>com.facebook.presto</groupId>
+            <artifactId>presto-jdbc</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.trino</groupId>
+            <artifactId>trino-jdbc</artifactId>
+        </dependency>
     </dependencies>
 
 </project>
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java
index cf503be4d1..17f672213c 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java
@@ -46,4 +46,5 @@ public class DatabaseIdentifier {
     public static final String OPENGAUSS = "OpenGauss";
     public static final String HIGHGO = "Highgo";
     public static final String GREENPLUM = "Greenplum";
+    public static final String PRESTO = "Presto";
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/presto/PrestoDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/presto/PrestoDialect.java
new file mode 100644
index 0000000000..df3e8f5abf
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/presto/PrestoDialect.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.presto;
+
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
+
+import java.util.Optional;
+
+public class PrestoDialect implements JdbcDialect {
+    @Override
+    public String dialectName() {
+        return DatabaseIdentifier.PRESTO;
+    }
+
+    @Override
+    public JdbcRowConverter getRowConverter() {
+        return new PrestoJdbcRowConverter();
+    }
+
+    @Override
+    public JdbcDialectTypeMapper getJdbcDialectTypeMapper() {
+        return new PrestoTypeMapper();
+    }
+
+    @Override
+    public Optional<String> getUpsertStatement(
+            String database, String tableName, String[] fieldNames, String[] 
uniqueKeyFields) {
+        return Optional.empty();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/presto/PrestoDialectFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/presto/PrestoDialectFactory.java
new file mode 100644
index 0000000000..3b714ffeba
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/presto/PrestoDialectFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.presto;
+
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;
+
+import com.google.auto.service.AutoService;
+import lombok.NonNull;
+
+@AutoService(JdbcDialectFactory.class)
+public class PrestoDialectFactory implements JdbcDialectFactory {
+
+    @Override
+    public String dialectFactoryName() {
+        return DatabaseIdentifier.PRESTO;
+    }
+
+    @Override
+    public boolean acceptsURL(@NonNull String url) {
+        return url.startsWith("jdbc:presto:") || url.startsWith("jdbc:trino:");
+    }
+
+    @Override
+    public JdbcDialect create() {
+        return new PrestoDialect();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/presto/PrestoJdbcRowConverter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/presto/PrestoJdbcRowConverter.java
new file mode 100644
index 0000000000..01d5ba6e07
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/presto/PrestoJdbcRowConverter.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.presto;
+
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
+
+import javax.annotation.Nullable;
+
+import java.sql.PreparedStatement;
+
+public class PrestoJdbcRowConverter extends AbstractJdbcRowConverter {
+    @Override
+    public String converterName() {
+        return DatabaseIdentifier.PRESTO;
+    }
+
+    @Override
+    public PreparedStatement toExternal(
+            TableSchema tableSchema,
+            @Nullable TableSchema databaseTableSchema,
+            SeaTunnelRow row,
+            PreparedStatement statement) {
+        throw new JdbcConnectorException(
+                JdbcConnectorErrorCode.DONT_SUPPORT_SINK,
+                "The Presto jdbc connector don't support sink");
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/presto/PrestoTypeMapper.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/presto/PrestoTypeMapper.java
new file mode 100644
index 0000000000..2ef5fa921a
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/presto/PrestoTypeMapper.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.presto;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.common.exception.CommonError;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
+
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+
+public class PrestoTypeMapper implements JdbcDialectTypeMapper {
+    // ============================data types=====================
+
+    private static final String PRESTO_BOOLEAN = "BOOLEAN";
+
+    // -------------------------Structural----------------------------
+    private static final String PRESTO_ARRAY = "ARRAY";
+    private static final String PRESTO_MAP = "MAP";
+    private static final String PRESTO_ROW = "ROW";
+
+    // -------------------------number----------------------------
+    private static final String PRESTO_TINYINT = "TINYINT";
+    private static final String PRESTO_SMALLINT = "SMALLINT";
+    private static final String PRESTO_INTEGER = "INTEGER";
+    private static final String PRESTO_BIGINT = "BIGINT";
+    private static final String PRESTO_DECIMAL = "DECIMAL";
+    private static final String PRESTO_REAL = "REAL";
+    private static final String PRESTO_DOUBLE = "DOUBLE";
+
+    // -------------------------string----------------------------
+    private static final String PRESTO_CHAR = "CHAR";
+    private static final String PRESTO_VARCHAR = "VARCHAR";
+    private static final String PRESTO_JSON = "JSON";
+
+    // ------------------------------time-------------------------
+    private static final String PRESTO_DATE = "DATE";
+    private static final String PRESTO_TIME = "TIME";
+    private static final String PRESTO_TIMESTAMP = "TIMESTAMP";
+
+    // ------------------------------blob-------------------------
+    private static final String PRESTO_BINARY = "BINARY";
+    private static final String PRESTO_VARBINARY = "VARBINARY";
+
+    @SuppressWarnings("checkstyle:MagicNumber")
+    @Override
+    public SeaTunnelDataType<?> mapping(ResultSetMetaData metadata, int 
colIndex)
+            throws SQLException {
+        String columnType = metadata.getColumnTypeName(colIndex).toUpperCase();
+        // VARCHAR(x)      --->      VARCHAR
+        if (columnType.indexOf("(") > -1) {
+            columnType = columnType.split("\\(")[0];
+        }
+        int precision = metadata.getPrecision(colIndex);
+        int scale = metadata.getScale(colIndex);
+        switch (columnType) {
+            case PRESTO_BOOLEAN:
+                return BasicType.BOOLEAN_TYPE;
+            case PRESTO_TINYINT:
+                return BasicType.BYTE_TYPE;
+            case PRESTO_INTEGER:
+                return BasicType.INT_TYPE;
+            case PRESTO_SMALLINT:
+                return BasicType.SHORT_TYPE;
+            case PRESTO_BIGINT:
+                return BasicType.LONG_TYPE;
+            case PRESTO_DECIMAL:
+                return new DecimalType(precision, scale);
+            case PRESTO_REAL:
+                return BasicType.FLOAT_TYPE;
+            case PRESTO_DOUBLE:
+                return BasicType.DOUBLE_TYPE;
+            case PRESTO_CHAR:
+            case PRESTO_VARCHAR:
+            case PRESTO_JSON:
+                return BasicType.STRING_TYPE;
+            case PRESTO_DATE:
+                return LocalTimeType.LOCAL_DATE_TYPE;
+            case PRESTO_TIME:
+                return LocalTimeType.LOCAL_TIME_TYPE;
+            case PRESTO_TIMESTAMP:
+                return LocalTimeType.LOCAL_DATE_TIME_TYPE;
+            case PRESTO_VARBINARY:
+            case PRESTO_BINARY:
+                return PrimitiveByteArrayType.INSTANCE;
+                // Doesn't support yet
+            case PRESTO_MAP:
+            case PRESTO_ARRAY:
+            case PRESTO_ROW:
+            default:
+                final String jdbcColumnName = metadata.getColumnName(colIndex);
+                throw CommonError.convertToSeaTunnelTypeError(
+                        DatabaseIdentifier.PRESTO, columnType, jdbcColumnName);
+        }
+    }
+}
diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml
index a747887c59..9bc14f76db 100644
--- a/seatunnel-dist/pom.xml
+++ b/seatunnel-dist/pom.xml
@@ -111,6 +111,8 @@
                 <aliyun.sdk.oss.version>3.4.1</aliyun.sdk.oss.version>
                 <jdom.version>1.1</jdom.version>
                 <tidb.version>3.3.5</tidb.version>
+                <presto.version>0.279</presto.version>
+                <trino.version>460</trino.version>
             </properties>
             <dependencies>
                 <!-- starters -->
@@ -982,6 +984,18 @@
                     <classifier>optional</classifier>
                     <scope>provided</scope>
                 </dependency>
+                <dependency>
+                    <groupId>com.facebook.presto</groupId>
+                    <artifactId>presto-jdbc</artifactId>
+                    <version>${presto.version}</version>
+                    <scope>provided</scope>
+                </dependency>
+                <dependency>
+                    <groupId>io.trino</groupId>
+                    <artifactId>trino-jdbc</artifactId>
+                    <version>${trino.version}</version>
+                    <scope>provided</scope>
+                </dependency>
             </dependencies>
             <build>
                 <finalName>apache-seatunnel-${project.version}</finalName>
diff --git a/seatunnel-dist/src/main/assembly/assembly-bin-ci.xml 
b/seatunnel-dist/src/main/assembly/assembly-bin-ci.xml
index 2ed7f180b7..2e056a1afb 100644
--- a/seatunnel-dist/src/main/assembly/assembly-bin-ci.xml
+++ b/seatunnel-dist/src/main/assembly/assembly-bin-ci.xml
@@ -195,6 +195,8 @@
                 <include>org.apache.hive:hive-exec:jar</include>
                 <include>org.apache.hive:hive-service:jar</include>
                 <include>org.apache.thrift:libfb303:jar</include>
+                <include>com.facebook.presto:presto-jdbc:jar</include>
+                <include>io.trino:trino-jdbc:jar</include>
             </includes>
             
<outputFileNameMapping>${artifact.file.name}</outputFileNameMapping>
             <outputDirectory>/lib</outputDirectory>
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/pom.xml
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/pom.xml
index d9ec7df954..9fb8c4f055 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/pom.xml
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/pom.xml
@@ -111,6 +111,17 @@
             <artifactId>HgdbJdbc</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>com.facebook.presto</groupId>
+            <artifactId>presto-jdbc</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>io.trino</groupId>
+            <artifactId>trino-jdbc</artifactId>
+            <scope>test</scope>
+        </dependency>
+
     </dependencies>
 
 </project>
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPrestoIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPrestoIT.java
new file mode 100644
index 0000000000..2200d4bd4a
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPrestoIT.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.jdbc;
+
+import org.apache.seatunnel.shade.com.google.common.collect.Lists;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+import org.apache.seatunnel.common.utils.ExceptionUtils;
+import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
+
+import org.junit.jupiter.api.Assertions;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.sql.Driver;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+import java.util.Properties;
+
+@Slf4j
+public class JdbcPrestoIT extends AbstractJdbcIT {
+    protected static final String PRESTO_IMAGE = "prestodb/presto";
+
+    private static final String PRESTO_ALIASES = "e2e-presto";
+    private static final String DRIVER_CLASS = 
"com.facebook.presto.jdbc.PrestoDriver";
+    private static final int PRESTO_PORT = 18080;
+    private static final String PRESTO_URL = "jdbc:presto://" + HOST + 
":%s/memory?timeZoneId=UTC";
+    private static final String USERNAME = "presto";
+    private static final String DATABASE = "memory.default";
+    private static final String SOURCE_TABLE = "presto_e2e_source_table";
+    private static final List<String> CONFIG_FILE =
+            Lists.newArrayList("/jdbc_presto_source_and_assert.conf");
+
+    private static final String CREATE_SQL =
+            "CREATE TABLE IF NOT EXISTS %s (\n"
+                    + "  id                     BIGINT,\n"
+                    + "boolean_col              BOOLEAN,\n"
+                    + "tinyint_col              TINYINT,\n"
+                    + "smallint_col             SMALLINT,\n"
+                    + "integer_col              INTEGER,\n"
+                    + "bigint_col               BIGINT,\n"
+                    + "decimal_col              DECIMAL(22,4),\n"
+                    + "real_col                 REAL,\n"
+                    + "double_col               DOUBLE,\n"
+                    + "char_col                 CHAR,\n"
+                    + "varchar_col              VARCHAR,\n"
+                    + "date_col                 DATE,\n"
+                    + "time_col                 TIME,\n"
+                    + "timestamp_col            TIMESTAMP,\n"
+                    + "varbinary_col            VARBINARY,\n"
+                    + "json_col                 json\n"
+                    + ")";
+
+    @TestContainerExtension
+    protected final ContainerExtendedFactory extendedFactory =
+            container -> {
+                Container.ExecResult extraCommands =
+                        container.execInContainer(
+                                "bash",
+                                "-c",
+                                "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && 
cd /tmp/seatunnel/plugins/Jdbc/lib && curl -O "
+                                        + driverUrl());
+                Assertions.assertEquals(0, extraCommands.getExitCode(), 
extraCommands.getStderr());
+            };
+
+    @Override
+    protected void initializeJdbcConnection(String jdbcUrl)
+            throws SQLException, InstantiationException, 
IllegalAccessException {
+        Driver driver = (Driver) loadDriverClass().newInstance();
+        Properties props = new Properties();
+
+        if (StringUtils.isNotBlank(jdbcCase.getUserName())) {
+            props.put("user", jdbcCase.getUserName());
+        }
+
+        if (StringUtils.isNotBlank(jdbcCase.getPassword())) {
+            props.put("password", jdbcCase.getPassword());
+        }
+
+        if (dbServer != null) {
+            jdbcUrl = jdbcUrl.replace(HOST, dbServer.getHost());
+        }
+
+        this.connection = driver.connect(jdbcUrl, props);
+
+        // maybe the Presto server is still initializing
+        int tryTimes = 5;
+        for (int i = 0; i < tryTimes; i++) {
+            try (Statement statement = connection.createStatement()) {
+                statement.executeQuery(" select 1 ");
+                break;
+            } catch (SQLException ignored) {
+                log.info("the Presto server is still initializing. wait it ");
+            }
+            try {
+                Thread.sleep(15 * 1000);
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        }
+        this.connection.setAutoCommit(false);
+    }
+
+    @Override
+    JdbcCase getJdbcCase() {
+        String jdbcUrl = String.format(PRESTO_URL, PRESTO_PORT, DATABASE);
+        return JdbcCase.builder()
+                .dockerImage(PRESTO_IMAGE)
+                .networkAliases(PRESTO_ALIASES)
+                .driverClass(DRIVER_CLASS)
+                .host(HOST)
+                .port(PRESTO_PORT)
+                .localPort(PRESTO_PORT)
+                .jdbcTemplate(PRESTO_URL)
+                .jdbcUrl(jdbcUrl)
+                .userName(USERNAME)
+                .database(DATABASE)
+                .sourceTable(SOURCE_TABLE)
+                .catalogDatabase(DATABASE)
+                .createSql(CREATE_SQL)
+                .configFile(CONFIG_FILE)
+                .useSaveModeCreateTable(true)
+                .build();
+    }
+
+    @Override
+    protected void insertTestData() {
+        try (Statement statement = connection.createStatement()) {
+            for (int i = 1; i <= 3; i++) {
+                statement.execute(
+                        "insert into memory.default.presto_e2e_source_table\n"
+                                + "values(\n"
+                                + "1,\n"
+                                + "true,\n"
+                                + "cast(127 as tinyint),\n"
+                                + "cast(32767 as smallint),\n"
+                                + "3,\n"
+                                + "1234567890,\n"
+                                + "55.0005,\n"
+                                + "67.89,\n"
+                                + "123.45,\n"
+                                + "'8',\n"
+                                + "'VarcharCol',\n"
+                                + "date '2024-01-01',\n"
+                                + "time '12:01:01',\n"
+                                + "timestamp '2024-01-01 12:01:01',\n"
+                                + "VARBINARY 'str',\n"
+                                + "json '{\"key\":\"val\"}'\n"
+                                + ")");
+            }
+        } catch (Exception exception) {
+            log.error(ExceptionUtils.getMessage(exception));
+            throw new 
SeaTunnelRuntimeException(JdbcITErrorCode.INSERT_DATA_FAILED, exception);
+        }
+    }
+
+    @Override
+    String driverUrl() {
+        return 
"https://repo1.maven.org/maven2/com/facebook/presto/presto-jdbc/0.279/presto-jdbc-0.279.jar";;
+    }
+
+    @Override
+    Pair<String[], List<SeaTunnelRow>> initTestData() {
+        return null;
+    }
+
+    @Override
+    public String quoteIdentifier(String field) {
+        return field;
+    }
+
+    @Override
+    protected void clearTable(String database, String schema, String table) {
+        // do nothing.
+    }
+
+    @Override
+    GenericContainer<?> initContainer() {
+        GenericContainer<?> container =
+                new GenericContainer<>(PRESTO_IMAGE)
+                        .withNetwork(NETWORK)
+                        .withNetworkAliases(PRESTO_ALIASES)
+                        .withLogConsumer(
+                                new 
Slf4jLogConsumer(DockerLoggerFactory.getLogger(PRESTO_IMAGE)));
+        container.setPortBindings(Lists.newArrayList(String.format("%s:%s", 
PRESTO_PORT, "8080")));
+
+        return container;
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcTrinoIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcTrinoIT.java
new file mode 100644
index 0000000000..acd4d3145d
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcTrinoIT.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.jdbc;
+
+import org.apache.seatunnel.shade.com.google.common.collect.Lists;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+import org.apache.seatunnel.common.utils.ExceptionUtils;
+import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
+
+import org.junit.jupiter.api.Assertions;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.sql.Driver;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+import java.util.Properties;
+
+@Slf4j
+public class JdbcTrinoIT extends AbstractJdbcIT {
+    protected static final String TRINO_IMAGE = "trinodb/trino";
+
+    private static final String TRINO_ALIASES = "e2e-trino";
+    private static final String DRIVER_CLASS = "io.trino.jdbc.TrinoDriver";
+    private static final int TRINO_PORT = 28080;
+    private static final String TRINO_URL = "jdbc:trino://" + HOST + 
":%s/memory?timezone=UTC";
+    private static final String USERNAME = "trino";
+    private static final String DATABASE = "memory.default";
+    private static final String SOURCE_TABLE = "trino_e2e_source_table";
+    private static final List<String> CONFIG_FILE =
+            Lists.newArrayList("/jdbc_trino_source_and_assert.conf");
+
+    private static final String CREATE_SQL =
+            "CREATE TABLE IF NOT EXISTS %s (\n"
+                    + "  id                     BIGINT,\n"
+                    + "boolean_col              BOOLEAN,\n"
+                    + "tinyint_col              TINYINT,\n"
+                    + "smallint_col             SMALLINT,\n"
+                    + "integer_col              INTEGER,\n"
+                    + "bigint_col               BIGINT,\n"
+                    + "decimal_col              DECIMAL(22,4),\n"
+                    + "real_col                 REAL,\n"
+                    + "double_col               DOUBLE,\n"
+                    + "char_col                 CHAR,\n"
+                    + "varchar_col              VARCHAR,\n"
+                    + "date_col                 DATE,\n"
+                    + "time_col                 TIME,\n"
+                    + "timestamp_col            TIMESTAMP,\n"
+                    + "varbinary_col            VARBINARY,\n"
+                    + "json_col                 json\n"
+                    + ")";
+
+    @TestContainerExtension
+    protected final ContainerExtendedFactory extendedFactory =
+            container -> {
+                Container.ExecResult extraCommands =
+                        container.execInContainer(
+                                "bash",
+                                "-c",
+                                "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && 
cd /tmp/seatunnel/plugins/Jdbc/lib && curl -O "
+                                        + driverUrl());
+                Assertions.assertEquals(0, extraCommands.getExitCode(), 
extraCommands.getStderr());
+            };
+
+    @Override
+    protected void initializeJdbcConnection(String jdbcUrl)
+            throws SQLException, InstantiationException, 
IllegalAccessException {
+        Driver driver = (Driver) loadDriverClass().newInstance();
+        Properties props = new Properties();
+
+        if (StringUtils.isNotBlank(jdbcCase.getUserName())) {
+            props.put("user", jdbcCase.getUserName());
+        }
+
+        if (StringUtils.isNotBlank(jdbcCase.getPassword())) {
+            props.put("password", jdbcCase.getPassword());
+        }
+
+        if (dbServer != null) {
+            jdbcUrl = jdbcUrl.replace(HOST, dbServer.getHost());
+        }
+
+        this.connection = driver.connect(jdbcUrl, props);
+
+        // maybe the TRINO  server is still initializing
+        int tryTimes = 5;
+        for (int i = 0; i < tryTimes; i++) {
+            try (Statement statement = connection.createStatement()) {
+                statement.executeQuery(" select 1 ");
+                break;
+            } catch (SQLException ignored) {
+                log.info("the Trino server is still initializing. wait it ");
+            }
+            try {
+                Thread.sleep(15 * 1000);
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    @Override
+    protected void createNeededTables() {
+        try (Statement statement = connection.createStatement()) {
+            String createTemplate = jdbcCase.getCreateSql();
+
+            String createSource =
+                    String.format(
+                            createTemplate,
+                            buildTableInfoWithSchema(
+                                    jdbcCase.getDatabase(),
+                                    jdbcCase.getSchema(),
+                                    jdbcCase.getSourceTable()));
+            statement.execute(createSource);
+        } catch (Exception exception) {
+            log.error(ExceptionUtils.getMessage(exception));
+            throw new 
SeaTunnelRuntimeException(JdbcITErrorCode.CREATE_TABLE_FAILED, exception);
+        }
+    }
+
+    @Override
+    JdbcCase getJdbcCase() {
+        String jdbcUrl = String.format(TRINO_URL, TRINO_PORT, DATABASE);
+        return JdbcCase.builder()
+                .dockerImage(TRINO_IMAGE)
+                .networkAliases(TRINO_ALIASES)
+                .driverClass(DRIVER_CLASS)
+                .host(HOST)
+                .port(TRINO_PORT)
+                .localPort(TRINO_PORT)
+                .jdbcTemplate(TRINO_URL)
+                .jdbcUrl(jdbcUrl)
+                .userName(USERNAME)
+                .database(DATABASE)
+                .sourceTable(SOURCE_TABLE)
+                .catalogDatabase(DATABASE)
+                .createSql(CREATE_SQL)
+                .configFile(CONFIG_FILE)
+                .useSaveModeCreateTable(true)
+                .build();
+    }
+
+    @Override
+    protected void insertTestData() {
+        try (Statement statement = connection.createStatement()) {
+            for (int i = 1; i <= 3; i++) {
+                statement.execute(
+                        "insert into memory.default.trino_e2e_source_table\n"
+                                + "values(\n"
+                                + "1,\n"
+                                + "true,\n"
+                                + "cast(127 as tinyint),\n"
+                                + "cast(32767 as smallint),\n"
+                                + "3,\n"
+                                + "1234567890,\n"
+                                + "55.0005,\n"
+                                + "67.89,\n"
+                                + "123.45,\n"
+                                + "'8',\n"
+                                + "'VarcharCol',\n"
+                                + "date '2024-01-01',\n"
+                                + "time '12:01:01',\n"
+                                + "timestamp '2024-01-01 12:01:01',\n"
+                                + "VARBINARY 'str',\n"
+                                + "json '{\"key\":\"val\"}'\n"
+                                + ")");
+            }
+        } catch (Exception exception) {
+            log.error(ExceptionUtils.getMessage(exception));
+            throw new 
SeaTunnelRuntimeException(JdbcITErrorCode.INSERT_DATA_FAILED, exception);
+        }
+    }
+
+    @Override
+    String driverUrl() {
+        return 
"https://repo1.maven.org/maven2/io/trino/trino-jdbc/460/trino-jdbc-460.jar";;
+    }
+
+    @Override
+    Pair<String[], List<SeaTunnelRow>> initTestData() {
+        return null;
+    }
+
+    @Override
+    public String quoteIdentifier(String field) {
+        return field;
+    }
+
+    @Override
+    protected void clearTable(String database, String schema, String table) {
+        // do nothing.
+    }
+
+    @Override
+    GenericContainer<?> initContainer() {
+        GenericContainer<?> container =
+                new GenericContainer<>(TRINO_IMAGE)
+                        .withNetwork(NETWORK)
+                        .withNetworkAliases(TRINO_ALIASES)
+                        .withLogConsumer(
+                                new 
Slf4jLogConsumer(DockerLoggerFactory.getLogger(TRINO_IMAGE)));
+        container.setPortBindings(Lists.newArrayList(String.format("%s:%s", 
TRINO_PORT, "8080")));
+
+        return container;
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/resources/jdbc_presto_source_and_assert.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/resources/jdbc_presto_source_and_assert.conf
new file mode 100644
index 0000000000..981570c91c
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/resources/jdbc_presto_source_and_assert.conf
@@ -0,0 +1,154 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  jdbc {
+    url = "jdbc:presto://e2e-Presto:8080/memory?timeZoneId=UTC"
+    driver = "com.facebook.presto.jdbc.PrestoDriver"
+    connection_check_timeout_sec = 100
+    user = "presto"
+    query = "select * from memory.default.presto_e2e_source_table"
+    split.size = 10
+  }
+}
+
+transform {
+}
+
+
+
+sink {
+assert {
+    rules =
+      {
+        row_rules = [
+          {
+            rule_type = MAX_ROW
+            rule_value = 3
+          },
+          {
+            rule_type = MIN_ROW
+            rule_value = 3
+          }
+        ],
+        field_rules = [
+        {
+          field_name = id
+          field_type = long
+          field_value = [{equals_to = 1}]
+        },
+        {
+          field_name = boolean_col
+          field_type = boolean
+          field_value = [{equals_to = "TRUE"}]
+        },
+        {
+          field_name = tinyint_col
+          field_type = tinyint
+          field_value = [{equals_to = 127}]
+        },
+        {
+          field_name = smallint_col
+          field_type = smallint
+          field_value = [{equals_to = 32767}]
+         },
+        {
+          field_name = integer_col
+          field_type = int
+          field_value = [{equals_to = 3}]
+         },
+        {
+          field_name = bigint_col
+          field_type = long
+          field_value = [{equals_to = 1234567890}]
+          },
+        {
+          field_name = decimal_col
+          field_type = "decimal(22,4)"
+          field_value = [{equals_to = "55.0005"}]
+          },
+        {
+          field_name = real_col
+          field_type = float
+          field_value = [{equals_to = 67.89}]
+          },
+        {
+          field_name = double_col
+          field_type = double
+          field_value = [{equals_to = 123.45}]
+          },
+        {
+          field_name = char_col
+          field_type = string
+          field_value = [{equals_to = "8"}]
+          },
+        {
+          field_name = varchar_col
+          field_type = string
+          field_value = [{equals_to = "VarcharCol"}]
+          },
+        {
+          field_name = date_col
+          field_type = date
+          field_value = [{equals_to = "2024-01-01"}]
+          },
+        {
+          field_name = time_col
+          field_type = time
+          field_value = [{equals_to = "12:01:01"}]
+          },
+        {
+          field_name = timestamp_col
+          field_type = timestamp
+          field_value = [{equals_to = "2024-01-01T12:01:01"}]
+          },
+        {
+          field_name = varbinary_col
+          field_type = bytes
+          field_value = [{equals_to = "c3Ry"}]
+          },
+        {
+         field_name = json_col
+          field_type = string
+          field_value = [{equals_to = "{\"key\":\"val\"}"}]
+         }
+        ]
+      }
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/resources/jdbc_trino_source_and_assert.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/resources/jdbc_trino_source_and_assert.conf
new file mode 100644
index 0000000000..69599639ce
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/resources/jdbc_trino_source_and_assert.conf
@@ -0,0 +1,154 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  jdbc {
+    url = "jdbc:trino://e2e-trino:8080/memory?timezone=UTC"
+    driver = "io.trino.jdbc.TrinoDriver"
+    connection_check_timeout_sec = 100
+    user = "trino"
+    query = "select * from memory.default.trino_e2e_source_table"
+    split.size = 10
+  }
+}
+
+transform {
+}
+
+
+
+sink {
+assert {
+    rules =
+      {
+        row_rules = [
+          {
+            rule_type = MAX_ROW
+            rule_value = 3
+          },
+          {
+            rule_type = MIN_ROW
+            rule_value = 3
+          }
+        ],
+        field_rules = [
+        {
+          field_name = id
+          field_type = long
+          field_value = [{equals_to = 1}]
+        },
+        {
+          field_name = boolean_col
+          field_type = boolean
+          field_value = [{equals_to = "TRUE"}]
+        },
+        {
+          field_name = tinyint_col
+          field_type = tinyint
+          field_value = [{equals_to = 127}]
+        },
+        {
+          field_name = smallint_col
+          field_type = smallint
+          field_value = [{equals_to = 32767}]
+         },
+        {
+          field_name = integer_col
+          field_type = int
+          field_value = [{equals_to = 3}]
+         },
+        {
+          field_name = bigint_col
+          field_type = long
+          field_value = [{equals_to = 1234567890}]
+          },
+        {
+          field_name = decimal_col
+          field_type = "decimal(22,4)"
+          field_value = [{equals_to = "55.0005"}]
+          },
+        {
+          field_name = real_col
+          field_type = float
+          field_value = [{equals_to = 67.89}]
+          },
+        {
+          field_name = double_col
+          field_type = double
+          field_value = [{equals_to = 123.45}]
+          },
+        {
+          field_name = char_col
+          field_type = string
+          field_value = [{equals_to = "8"}]
+          },
+        {
+          field_name = varchar_col
+          field_type = string
+          field_value = [{equals_to = "VarcharCol"}]
+          },
+        {
+          field_name = date_col
+          field_type = date
+          field_value = [{equals_to = "2024-01-01"}]
+          },
+        {
+          field_name = time_col
+          field_type = time
+          field_value = [{equals_to = "12:01:01"}]
+          },
+        {
+          field_name = timestamp_col
+          field_type = timestamp
+          field_value = [{equals_to = "2024-01-01T12:01:01"}]
+          },
+        {
+          field_name = varbinary_col
+          field_type = bytes
+          field_value = [{equals_to = "c3Ry"}]
+          },
+        {
+         field_name = json_col
+          field_type = string
+          field_value = [{equals_to = "{\"key\":\"val\"}"}]
+         }
+        ]
+      }
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
index 1be4cb4ca6..b43f35359a 100644
--- 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
+++ 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
@@ -340,7 +340,7 @@ public class SeaTunnelContainer extends 
AbstractTestContainer {
         } else {
             // Waiting 10s for release thread
             Awaitility.await()
-                    .atMost(30, TimeUnit.SECONDS)
+                    .atMost(60, TimeUnit.SECONDS)
                     .untilAsserted(
                             () -> {
                                 List<String> threads = 
ContainerUtil.getJVMThreadNames(server);

Reply via email to