This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 3cac2bd126 [Feature][Connector-V2][JDBC] Add presto/trino dialect
(#9388)
3cac2bd126 is described below
commit 3cac2bd126456b424c0c6a5045371f62c2c65b34
Author: dyp12 <[email protected]>
AuthorDate: Tue Jun 3 11:55:59 2025 +0800
[Feature][Connector-V2][JDBC] Add presto/trino dialect (#9388)
---
docs/en/connector-v2/source/Jdbc.md | 2 +
seatunnel-connectors-v2/connector-jdbc/pom.xml | 22 ++
.../jdbc/internal/dialect/DatabaseIdentifier.java | 1 +
.../internal/dialect/presto/PrestoDialect.java | 48 +++++
.../dialect/presto/PrestoDialectFactory.java | 44 ++++
.../dialect/presto/PrestoJdbcRowConverter.java | 47 +++++
.../internal/dialect/presto/PrestoTypeMapper.java | 116 +++++++++++
seatunnel-dist/pom.xml | 14 ++
.../src/main/assembly/assembly-bin-ci.xml | 2 +
.../connector-jdbc-e2e-part-7/pom.xml | 11 +
.../connectors/seatunnel/jdbc/JdbcPrestoIT.java | 213 +++++++++++++++++++
.../connectors/seatunnel/jdbc/JdbcTrinoIT.java | 231 +++++++++++++++++++++
.../resources/jdbc_presto_source_and_assert.conf | 154 ++++++++++++++
.../resources/jdbc_trino_source_and_assert.conf | 154 ++++++++++++++
.../container/seatunnel/SeaTunnelContainer.java | 2 +-
15 files changed, 1060 insertions(+), 1 deletion(-)
diff --git a/docs/en/connector-v2/source/Jdbc.md
b/docs/en/connector-v2/source/Jdbc.md
index 3d2d6b995a..88637bd2cc 100644
--- a/docs/en/connector-v2/source/Jdbc.md
+++ b/docs/en/connector-v2/source/Jdbc.md
@@ -163,6 +163,8 @@ there are some reference value for params above.
| InterSystems IRIS | com.intersystems.jdbc.IRISDriver |
jdbc:IRIS://localhost:1972/%SYS |
https://raw.githubusercontent.com/intersystems-community/iris-driver-distribution/main/JDBC/JDK18/intersystems-jdbc-3.8.4.jar
|
| opengauss | org.opengauss.Driver |
jdbc:opengauss://localhost:5432/postgres |
https://repo1.maven.org/maven2/org/opengauss/opengauss-jdbc/5.1.0-og/opengauss-jdbc-5.1.0-og.jar
|
| Highgo | com.highgo.jdbc.Driver |
jdbc:highgo://localhost:5866/highgo |
https://repo1.maven.org/maven2/com/highgo/HgdbJdbc/6.2.3/HgdbJdbc-6.2.3.jar
|
+| Presto | com.facebook.presto.jdbc.PrestoDriver |
jdbc:presto://localhost:8080/presto |
https://repo1.maven.org/maven2/com/facebook/presto/presto-jdbc/0.279/presto-jdbc-0.279.jar
|
+| Trino | io.trino.jdbc.TrinoDriver |
jdbc:trino://localhost:8080/trino |
https://repo1.maven.org/maven2/io/trino/trino-jdbc/460/trino-jdbc-460.jar
|
## Example
diff --git a/seatunnel-connectors-v2/connector-jdbc/pom.xml
b/seatunnel-connectors-v2/connector-jdbc/pom.xml
index 47fd43e6d8..87705fe01a 100644
--- a/seatunnel-connectors-v2/connector-jdbc/pom.xml
+++ b/seatunnel-connectors-v2/connector-jdbc/pom.xml
@@ -56,6 +56,8 @@
<opengauss.jdbc.version>5.1.0-og</opengauss.jdbc.version>
<mariadb.jdbc.version>3.5.1</mariadb.jdbc.version>
<highgo.version>6.2.3</highgo.version>
+ <presto.version>0.279</presto.version>
+ <trino.version>460</trino.version>
</properties>
<dependencyManagement>
@@ -229,6 +231,18 @@
<version>${highgo.version}</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>com.facebook.presto</groupId>
+ <artifactId>presto-jdbc</artifactId>
+ <version>${presto.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.trino</groupId>
+ <artifactId>trino-jdbc</artifactId>
+ <version>${trino.version}</version>
+ <scope>provided</scope>
+ </dependency>
</dependencies>
</dependencyManagement>
@@ -352,6 +366,14 @@
<groupId>com.highgo</groupId>
<artifactId>HgdbJdbc</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.facebook.presto</groupId>
+ <artifactId>presto-jdbc</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.trino</groupId>
+ <artifactId>trino-jdbc</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java
index cf503be4d1..17f672213c 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java
@@ -46,4 +46,5 @@ public class DatabaseIdentifier {
public static final String OPENGAUSS = "OpenGauss";
public static final String HIGHGO = "Highgo";
public static final String GREENPLUM = "Greenplum";
+ public static final String PRESTO = "Presto";
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/presto/PrestoDialect.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/presto/PrestoDialect.java
new file mode 100644
index 0000000000..df3e8f5abf
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/presto/PrestoDialect.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.presto;
+
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
+
+import java.util.Optional;
+
+public class PrestoDialect implements JdbcDialect {
+ @Override
+ public String dialectName() {
+ return DatabaseIdentifier.PRESTO;
+ }
+
+ @Override
+ public JdbcRowConverter getRowConverter() {
+ return new PrestoJdbcRowConverter();
+ }
+
+ @Override
+ public JdbcDialectTypeMapper getJdbcDialectTypeMapper() {
+ return new PrestoTypeMapper();
+ }
+
+ @Override
+ public Optional<String> getUpsertStatement(
+ String database, String tableName, String[] fieldNames, String[]
uniqueKeyFields) {
+ return Optional.empty();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/presto/PrestoDialectFactory.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/presto/PrestoDialectFactory.java
new file mode 100644
index 0000000000..3b714ffeba
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/presto/PrestoDialectFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.presto;
+
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;
+
+import com.google.auto.service.AutoService;
+import lombok.NonNull;
+
+@AutoService(JdbcDialectFactory.class)
+public class PrestoDialectFactory implements JdbcDialectFactory {
+
+ @Override
+ public String dialectFactoryName() {
+ return DatabaseIdentifier.PRESTO;
+ }
+
+ @Override
+ public boolean acceptsURL(@NonNull String url) {
+ return url.startsWith("jdbc:presto:") || url.startsWith("jdbc:trino:");
+ }
+
+ @Override
+ public JdbcDialect create() {
+ return new PrestoDialect();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/presto/PrestoJdbcRowConverter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/presto/PrestoJdbcRowConverter.java
new file mode 100644
index 0000000000..01d5ba6e07
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/presto/PrestoJdbcRowConverter.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.presto;
+
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
+
+import javax.annotation.Nullable;
+
+import java.sql.PreparedStatement;
+
+public class PrestoJdbcRowConverter extends AbstractJdbcRowConverter {
+ @Override
+ public String converterName() {
+ return DatabaseIdentifier.PRESTO;
+ }
+
+ @Override
+ public PreparedStatement toExternal(
+ TableSchema tableSchema,
+ @Nullable TableSchema databaseTableSchema,
+ SeaTunnelRow row,
+ PreparedStatement statement) {
+ throw new JdbcConnectorException(
+ JdbcConnectorErrorCode.DONT_SUPPORT_SINK,
+ "The Presto jdbc connector don't support sink");
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/presto/PrestoTypeMapper.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/presto/PrestoTypeMapper.java
new file mode 100644
index 0000000000..2ef5fa921a
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/presto/PrestoTypeMapper.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.presto;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.common.exception.CommonError;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
+
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+
+public class PrestoTypeMapper implements JdbcDialectTypeMapper {
+ // ============================data types=====================
+
+ private static final String PRESTO_BOOLEAN = "BOOLEAN";
+
+ // -------------------------Structural----------------------------
+ private static final String PRESTO_ARRAY = "ARRAY";
+ private static final String PRESTO_MAP = "MAP";
+ private static final String PRESTO_ROW = "ROW";
+
+ // -------------------------number----------------------------
+ private static final String PRESTO_TINYINT = "TINYINT";
+ private static final String PRESTO_SMALLINT = "SMALLINT";
+ private static final String PRESTO_INTEGER = "INTEGER";
+ private static final String PRESTO_BIGINT = "BIGINT";
+ private static final String PRESTO_DECIMAL = "DECIMAL";
+ private static final String PRESTO_REAL = "REAL";
+ private static final String PRESTO_DOUBLE = "DOUBLE";
+
+ // -------------------------string----------------------------
+ private static final String PRESTO_CHAR = "CHAR";
+ private static final String PRESTO_VARCHAR = "VARCHAR";
+ private static final String PRESTO_JSON = "JSON";
+
+ // ------------------------------time-------------------------
+ private static final String PRESTO_DATE = "DATE";
+ private static final String PRESTO_TIME = "TIME";
+ private static final String PRESTO_TIMESTAMP = "TIMESTAMP";
+
+ // ------------------------------blob-------------------------
+ private static final String PRESTO_BINARY = "BINARY";
+ private static final String PRESTO_VARBINARY = "VARBINARY";
+
+ @SuppressWarnings("checkstyle:MagicNumber")
+ @Override
+ public SeaTunnelDataType<?> mapping(ResultSetMetaData metadata, int
colIndex)
+ throws SQLException {
+ String columnType = metadata.getColumnTypeName(colIndex).toUpperCase();
+ // VARCHAR(x) ---> VARCHAR
+ if (columnType.indexOf("(") > -1) {
+ columnType = columnType.split("\\(")[0];
+ }
+ int precision = metadata.getPrecision(colIndex);
+ int scale = metadata.getScale(colIndex);
+ switch (columnType) {
+ case PRESTO_BOOLEAN:
+ return BasicType.BOOLEAN_TYPE;
+ case PRESTO_TINYINT:
+ return BasicType.BYTE_TYPE;
+ case PRESTO_INTEGER:
+ return BasicType.INT_TYPE;
+ case PRESTO_SMALLINT:
+ return BasicType.SHORT_TYPE;
+ case PRESTO_BIGINT:
+ return BasicType.LONG_TYPE;
+ case PRESTO_DECIMAL:
+ return new DecimalType(precision, scale);
+ case PRESTO_REAL:
+ return BasicType.FLOAT_TYPE;
+ case PRESTO_DOUBLE:
+ return BasicType.DOUBLE_TYPE;
+ case PRESTO_CHAR:
+ case PRESTO_VARCHAR:
+ case PRESTO_JSON:
+ return BasicType.STRING_TYPE;
+ case PRESTO_DATE:
+ return LocalTimeType.LOCAL_DATE_TYPE;
+ case PRESTO_TIME:
+ return LocalTimeType.LOCAL_TIME_TYPE;
+ case PRESTO_TIMESTAMP:
+ return LocalTimeType.LOCAL_DATE_TIME_TYPE;
+ case PRESTO_VARBINARY:
+ case PRESTO_BINARY:
+ return PrimitiveByteArrayType.INSTANCE;
+ // Doesn't support yet
+ case PRESTO_MAP:
+ case PRESTO_ARRAY:
+ case PRESTO_ROW:
+ default:
+ final String jdbcColumnName = metadata.getColumnName(colIndex);
+ throw CommonError.convertToSeaTunnelTypeError(
+ DatabaseIdentifier.PRESTO, columnType, jdbcColumnName);
+ }
+ }
+}
diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml
index a747887c59..9bc14f76db 100644
--- a/seatunnel-dist/pom.xml
+++ b/seatunnel-dist/pom.xml
@@ -111,6 +111,8 @@
<aliyun.sdk.oss.version>3.4.1</aliyun.sdk.oss.version>
<jdom.version>1.1</jdom.version>
<tidb.version>3.3.5</tidb.version>
+ <presto.version>0.279</presto.version>
+ <trino.version>460</trino.version>
</properties>
<dependencies>
<!-- starters -->
@@ -982,6 +984,18 @@
<classifier>optional</classifier>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>com.facebook.presto</groupId>
+ <artifactId>presto-jdbc</artifactId>
+ <version>${presto.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.trino</groupId>
+ <artifactId>trino-jdbc</artifactId>
+ <version>${trino.version}</version>
+ <scope>provided</scope>
+ </dependency>
</dependencies>
<build>
<finalName>apache-seatunnel-${project.version}</finalName>
diff --git a/seatunnel-dist/src/main/assembly/assembly-bin-ci.xml
b/seatunnel-dist/src/main/assembly/assembly-bin-ci.xml
index 2ed7f180b7..2e056a1afb 100644
--- a/seatunnel-dist/src/main/assembly/assembly-bin-ci.xml
+++ b/seatunnel-dist/src/main/assembly/assembly-bin-ci.xml
@@ -195,6 +195,8 @@
<include>org.apache.hive:hive-exec:jar</include>
<include>org.apache.hive:hive-service:jar</include>
<include>org.apache.thrift:libfb303:jar</include>
+ <include>com.facebook.presto:presto-jdbc:jar</include>
+ <include>io.trino:trino-jdbc:jar</include>
</includes>
<outputFileNameMapping>${artifact.file.name}</outputFileNameMapping>
<outputDirectory>/lib</outputDirectory>
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/pom.xml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/pom.xml
index d9ec7df954..9fb8c4f055 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/pom.xml
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/pom.xml
@@ -111,6 +111,17 @@
<artifactId>HgdbJdbc</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>com.facebook.presto</groupId>
+ <artifactId>presto-jdbc</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.trino</groupId>
+ <artifactId>trino-jdbc</artifactId>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
</project>
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPrestoIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPrestoIT.java
new file mode 100644
index 0000000000..2200d4bd4a
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPrestoIT.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.jdbc;
+
+import org.apache.seatunnel.shade.com.google.common.collect.Lists;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+import org.apache.seatunnel.common.utils.ExceptionUtils;
+import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
+
+import org.junit.jupiter.api.Assertions;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.sql.Driver;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+import java.util.Properties;
+
+@Slf4j
+public class JdbcPrestoIT extends AbstractJdbcIT {
+ protected static final String PRESTO_IMAGE = "prestodb/presto";
+
+ private static final String PRESTO_ALIASES = "e2e-presto";
+ private static final String DRIVER_CLASS =
"com.facebook.presto.jdbc.PrestoDriver";
+ private static final int PRESTO_PORT = 18080;
+ private static final String PRESTO_URL = "jdbc:presto://" + HOST +
":%s/memory?timeZoneId=UTC";
+ private static final String USERNAME = "presto";
+ private static final String DATABASE = "memory.default";
+ private static final String SOURCE_TABLE = "presto_e2e_source_table";
+ private static final List<String> CONFIG_FILE =
+ Lists.newArrayList("/jdbc_presto_source_and_assert.conf");
+
+ private static final String CREATE_SQL =
+ "CREATE TABLE IF NOT EXISTS %s (\n"
+ + " id BIGINT,\n"
+ + "boolean_col BOOLEAN,\n"
+ + "tinyint_col TINYINT,\n"
+ + "smallint_col SMALLINT,\n"
+ + "integer_col INTEGER,\n"
+ + "bigint_col BIGINT,\n"
+ + "decimal_col DECIMAL(22,4),\n"
+ + "real_col REAL,\n"
+ + "double_col DOUBLE,\n"
+ + "char_col CHAR,\n"
+ + "varchar_col VARCHAR,\n"
+ + "date_col DATE,\n"
+ + "time_col TIME,\n"
+ + "timestamp_col TIMESTAMP,\n"
+ + "varbinary_col VARBINARY,\n"
+ + "json_col json\n"
+ + ")";
+
+ @TestContainerExtension
+ protected final ContainerExtendedFactory extendedFactory =
+ container -> {
+ Container.ExecResult extraCommands =
+ container.execInContainer(
+ "bash",
+ "-c",
+ "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib &&
cd /tmp/seatunnel/plugins/Jdbc/lib && curl -O "
+ + driverUrl());
+ Assertions.assertEquals(0, extraCommands.getExitCode(),
extraCommands.getStderr());
+ };
+
+ @Override
+ protected void initializeJdbcConnection(String jdbcUrl)
+ throws SQLException, InstantiationException,
IllegalAccessException {
+ Driver driver = (Driver) loadDriverClass().newInstance();
+ Properties props = new Properties();
+
+ if (StringUtils.isNotBlank(jdbcCase.getUserName())) {
+ props.put("user", jdbcCase.getUserName());
+ }
+
+ if (StringUtils.isNotBlank(jdbcCase.getPassword())) {
+ props.put("password", jdbcCase.getPassword());
+ }
+
+ if (dbServer != null) {
+ jdbcUrl = jdbcUrl.replace(HOST, dbServer.getHost());
+ }
+
+ this.connection = driver.connect(jdbcUrl, props);
+
+ // maybe the Presto server is still initializing
+ int tryTimes = 5;
+ for (int i = 0; i < tryTimes; i++) {
+ try (Statement statement = connection.createStatement()) {
+ statement.executeQuery(" select 1 ");
+ break;
+ } catch (SQLException ignored) {
+ log.info("the Presto server is still initializing. wait it ");
+ }
+ try {
+ Thread.sleep(15 * 1000);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ this.connection.setAutoCommit(false);
+ }
+
+ @Override
+ JdbcCase getJdbcCase() {
+ String jdbcUrl = String.format(PRESTO_URL, PRESTO_PORT, DATABASE);
+ return JdbcCase.builder()
+ .dockerImage(PRESTO_IMAGE)
+ .networkAliases(PRESTO_ALIASES)
+ .driverClass(DRIVER_CLASS)
+ .host(HOST)
+ .port(PRESTO_PORT)
+ .localPort(PRESTO_PORT)
+ .jdbcTemplate(PRESTO_URL)
+ .jdbcUrl(jdbcUrl)
+ .userName(USERNAME)
+ .database(DATABASE)
+ .sourceTable(SOURCE_TABLE)
+ .catalogDatabase(DATABASE)
+ .createSql(CREATE_SQL)
+ .configFile(CONFIG_FILE)
+ .useSaveModeCreateTable(true)
+ .build();
+ }
+
+ @Override
+ protected void insertTestData() {
+ try (Statement statement = connection.createStatement()) {
+ for (int i = 1; i <= 3; i++) {
+ statement.execute(
+ "insert into memory.default.presto_e2e_source_table\n"
+ + "values(\n"
+ + "1,\n"
+ + "true,\n"
+ + "cast(127 as tinyint),\n"
+ + "cast(32767 as smallint),\n"
+ + "3,\n"
+ + "1234567890,\n"
+ + "55.0005,\n"
+ + "67.89,\n"
+ + "123.45,\n"
+ + "'8',\n"
+ + "'VarcharCol',\n"
+ + "date '2024-01-01',\n"
+ + "time '12:01:01',\n"
+ + "timestamp '2024-01-01 12:01:01',\n"
+ + "VARBINARY 'str',\n"
+ + "json '{\"key\":\"val\"}'\n"
+ + ")");
+ }
+ } catch (Exception exception) {
+ log.error(ExceptionUtils.getMessage(exception));
+ throw new
SeaTunnelRuntimeException(JdbcITErrorCode.INSERT_DATA_FAILED, exception);
+ }
+ }
+
+ @Override
+ String driverUrl() {
+ return
"https://repo1.maven.org/maven2/com/facebook/presto/presto-jdbc/0.279/presto-jdbc-0.279.jar";
+ }
+
+ @Override
+ Pair<String[], List<SeaTunnelRow>> initTestData() {
+ return null;
+ }
+
+ @Override
+ public String quoteIdentifier(String field) {
+ return field;
+ }
+
+ @Override
+ protected void clearTable(String database, String schema, String table) {
+ // do nothing.
+ }
+
+ @Override
+ GenericContainer<?> initContainer() {
+ GenericContainer<?> container =
+ new GenericContainer<>(PRESTO_IMAGE)
+ .withNetwork(NETWORK)
+ .withNetworkAliases(PRESTO_ALIASES)
+ .withLogConsumer(
+ new
Slf4jLogConsumer(DockerLoggerFactory.getLogger(PRESTO_IMAGE)));
+ container.setPortBindings(Lists.newArrayList(String.format("%s:%s",
PRESTO_PORT, "8080")));
+
+ return container;
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcTrinoIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcTrinoIT.java
new file mode 100644
index 0000000000..acd4d3145d
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcTrinoIT.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.jdbc;
+
+import org.apache.seatunnel.shade.com.google.common.collect.Lists;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+import org.apache.seatunnel.common.utils.ExceptionUtils;
+import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
+
+import org.junit.jupiter.api.Assertions;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.sql.Driver;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+import java.util.Properties;
+
+@Slf4j
+public class JdbcTrinoIT extends AbstractJdbcIT {
+ protected static final String TRINO_IMAGE = "trinodb/trino";
+
+ private static final String TRINO_ALIASES = "e2e-trino";
+ private static final String DRIVER_CLASS = "io.trino.jdbc.TrinoDriver";
+ private static final int TRINO_PORT = 28080;
+ private static final String TRINO_URL = "jdbc:trino://" + HOST +
":%s/memory?timezone=UTC";
+ private static final String USERNAME = "trino";
+ private static final String DATABASE = "memory.default";
+ private static final String SOURCE_TABLE = "trino_e2e_source_table";
+ private static final List<String> CONFIG_FILE =
+ Lists.newArrayList("/jdbc_trino_source_and_assert.conf");
+
+ private static final String CREATE_SQL =
+ "CREATE TABLE IF NOT EXISTS %s (\n"
+ + " id BIGINT,\n"
+ + "boolean_col BOOLEAN,\n"
+ + "tinyint_col TINYINT,\n"
+ + "smallint_col SMALLINT,\n"
+ + "integer_col INTEGER,\n"
+ + "bigint_col BIGINT,\n"
+ + "decimal_col DECIMAL(22,4),\n"
+ + "real_col REAL,\n"
+ + "double_col DOUBLE,\n"
+ + "char_col CHAR,\n"
+ + "varchar_col VARCHAR,\n"
+ + "date_col DATE,\n"
+ + "time_col TIME,\n"
+ + "timestamp_col TIMESTAMP,\n"
+ + "varbinary_col VARBINARY,\n"
+ + "json_col json\n"
+ + ")";
+
+ @TestContainerExtension
+ protected final ContainerExtendedFactory extendedFactory =
+ container -> {
+ Container.ExecResult extraCommands =
+ container.execInContainer(
+ "bash",
+ "-c",
+ "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib &&
cd /tmp/seatunnel/plugins/Jdbc/lib && curl -O "
+ + driverUrl());
+ Assertions.assertEquals(0, extraCommands.getExitCode(),
extraCommands.getStderr());
+ };
+
+ @Override
+ protected void initializeJdbcConnection(String jdbcUrl)
+ throws SQLException, InstantiationException,
IllegalAccessException {
+ Driver driver = (Driver) loadDriverClass().newInstance();
+ Properties props = new Properties();
+
+ if (StringUtils.isNotBlank(jdbcCase.getUserName())) {
+ props.put("user", jdbcCase.getUserName());
+ }
+
+ if (StringUtils.isNotBlank(jdbcCase.getPassword())) {
+ props.put("password", jdbcCase.getPassword());
+ }
+
+ if (dbServer != null) {
+ jdbcUrl = jdbcUrl.replace(HOST, dbServer.getHost());
+ }
+
+ this.connection = driver.connect(jdbcUrl, props);
+
+ // maybe the TRINO server is still initializing
+ int tryTimes = 5;
+ for (int i = 0; i < tryTimes; i++) {
+ try (Statement statement = connection.createStatement()) {
+ statement.executeQuery(" select 1 ");
+ break;
+ } catch (SQLException ignored) {
+ log.info("the Trino server is still initializing. wait it ");
+ }
+ try {
+ Thread.sleep(15 * 1000);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @Override
+ protected void createNeededTables() {
+ try (Statement statement = connection.createStatement()) {
+ String createTemplate = jdbcCase.getCreateSql();
+
+ String createSource =
+ String.format(
+ createTemplate,
+ buildTableInfoWithSchema(
+ jdbcCase.getDatabase(),
+ jdbcCase.getSchema(),
+ jdbcCase.getSourceTable()));
+ statement.execute(createSource);
+ } catch (Exception exception) {
+ log.error(ExceptionUtils.getMessage(exception));
+ throw new
SeaTunnelRuntimeException(JdbcITErrorCode.CREATE_TABLE_FAILED, exception);
+ }
+ }
+
+ @Override
+ JdbcCase getJdbcCase() {
+ String jdbcUrl = String.format(TRINO_URL, TRINO_PORT, DATABASE);
+ return JdbcCase.builder()
+ .dockerImage(TRINO_IMAGE)
+ .networkAliases(TRINO_ALIASES)
+ .driverClass(DRIVER_CLASS)
+ .host(HOST)
+ .port(TRINO_PORT)
+ .localPort(TRINO_PORT)
+ .jdbcTemplate(TRINO_URL)
+ .jdbcUrl(jdbcUrl)
+ .userName(USERNAME)
+ .database(DATABASE)
+ .sourceTable(SOURCE_TABLE)
+ .catalogDatabase(DATABASE)
+ .createSql(CREATE_SQL)
+ .configFile(CONFIG_FILE)
+ .useSaveModeCreateTable(true)
+ .build();
+ }
+
+ @Override
+ protected void insertTestData() {
+ try (Statement statement = connection.createStatement()) {
+ for (int i = 1; i <= 3; i++) {
+ statement.execute(
+ "insert into memory.default.trino_e2e_source_table\n"
+ + "values(\n"
+ + "1,\n"
+ + "true,\n"
+ + "cast(127 as tinyint),\n"
+ + "cast(32767 as smallint),\n"
+ + "3,\n"
+ + "1234567890,\n"
+ + "55.0005,\n"
+ + "67.89,\n"
+ + "123.45,\n"
+ + "'8',\n"
+ + "'VarcharCol',\n"
+ + "date '2024-01-01',\n"
+ + "time '12:01:01',\n"
+ + "timestamp '2024-01-01 12:01:01',\n"
+ + "VARBINARY 'str',\n"
+ + "json '{\"key\":\"val\"}'\n"
+ + ")");
+ }
+ } catch (Exception exception) {
+ log.error(ExceptionUtils.getMessage(exception));
+ throw new
SeaTunnelRuntimeException(JdbcITErrorCode.INSERT_DATA_FAILED, exception);
+ }
+ }
+
+ @Override
+ String driverUrl() {
+ return
"https://repo1.maven.org/maven2/io/trino/trino-jdbc/460/trino-jdbc-460.jar";
+ }
+
+ @Override
+ Pair<String[], List<SeaTunnelRow>> initTestData() {
+ return null;
+ }
+
+ @Override
+ public String quoteIdentifier(String field) {
+ return field;
+ }
+
+ @Override
+ protected void clearTable(String database, String schema, String table) {
+ // do nothing.
+ }
+
+ @Override
+ GenericContainer<?> initContainer() {
+ GenericContainer<?> container =
+ new GenericContainer<>(TRINO_IMAGE)
+ .withNetwork(NETWORK)
+ .withNetworkAliases(TRINO_ALIASES)
+ .withLogConsumer(
+ new
Slf4jLogConsumer(DockerLoggerFactory.getLogger(TRINO_IMAGE)));
+ container.setPortBindings(Lists.newArrayList(String.format("%s:%s",
TRINO_PORT, "8080")));
+
+ return container;
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/resources/jdbc_presto_source_and_assert.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/resources/jdbc_presto_source_and_assert.conf
new file mode 100644
index 0000000000..981570c91c
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/resources/jdbc_presto_source_and_assert.conf
@@ -0,0 +1,154 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ jdbc {
+ url = "jdbc:presto://e2e-Presto:8080/memory?timeZoneId=UTC"
+ driver = "com.facebook.presto.jdbc.PrestoDriver"
+ connection_check_timeout_sec = 100
+ user = "presto"
+ query = "select * from memory.default.presto_e2e_source_table"
+ split.size = 10
+ }
+}
+
+transform {
+}
+
+
+
+sink {
+assert {
+ rules =
+ {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 3
+ },
+ {
+ rule_type = MIN_ROW
+ rule_value = 3
+ }
+ ],
+ field_rules = [
+ {
+ field_name = id
+ field_type = long
+ field_value = [{equals_to = 1}]
+ },
+ {
+ field_name = boolean_col
+ field_type = boolean
+ field_value = [{equals_to = "TRUE"}]
+ },
+ {
+ field_name = tinyint_col
+ field_type = tinyint
+ field_value = [{equals_to = 127}]
+ },
+ {
+ field_name = smallint_col
+ field_type = smallint
+ field_value = [{equals_to = 32767}]
+ },
+ {
+ field_name = integer_col
+ field_type = int
+ field_value = [{equals_to = 3}]
+ },
+ {
+ field_name = bigint_col
+ field_type = long
+ field_value = [{equals_to = 1234567890}]
+ },
+ {
+ field_name = decimal_col
+ field_type = "decimal(22,4)"
+ field_value = [{equals_to = "55.0005"}]
+ },
+ {
+ field_name = real_col
+ field_type = float
+ field_value = [{equals_to = 67.89}]
+ },
+ {
+ field_name = double_col
+ field_type = double
+ field_value = [{equals_to = 123.45}]
+ },
+ {
+ field_name = char_col
+ field_type = string
+ field_value = [{equals_to = "8"}]
+ },
+ {
+ field_name = varchar_col
+ field_type = string
+ field_value = [{equals_to = "VarcharCol"}]
+ },
+ {
+ field_name = date_col
+ field_type = date
+ field_value = [{equals_to = "2024-01-01"}]
+ },
+ {
+ field_name = time_col
+ field_type = time
+ field_value = [{equals_to = "12:01:01"}]
+ },
+ {
+ field_name = timestamp_col
+ field_type = timestamp
+ field_value = [{equals_to = "2024-01-01T12:01:01"}]
+ },
+ {
+ field_name = varbinary_col
+ field_type = bytes
+ field_value = [{equals_to = "c3Ry"}]
+ },
+ {
+ field_name = json_col
+ field_type = string
+ field_value = [{equals_to = "{\"key\":\"val\"}"}]
+ }
+ ]
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/resources/jdbc_trino_source_and_assert.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/resources/jdbc_trino_source_and_assert.conf
new file mode 100644
index 0000000000..69599639ce
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/resources/jdbc_trino_source_and_assert.conf
@@ -0,0 +1,154 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ jdbc {
+ url = "jdbc:trino://e2e-trino:8080/memory?timezone=UTC"
+ driver = "io.trino.jdbc.TrinoDriver"
+ connection_check_timeout_sec = 100
+ user = "trino"
+ query = "select * from memory.default.trino_e2e_source_table"
+ split.size = 10
+ }
+}
+
+transform {
+}
+
+
+
+sink {
+assert {
+ rules =
+ {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 3
+ },
+ {
+ rule_type = MIN_ROW
+ rule_value = 3
+ }
+ ],
+ field_rules = [
+ {
+ field_name = id
+ field_type = long
+ field_value = [{equals_to = 1}]
+ },
+ {
+ field_name = boolean_col
+ field_type = boolean
+ field_value = [{equals_to = "TRUE"}]
+ },
+ {
+ field_name = tinyint_col
+ field_type = tinyint
+ field_value = [{equals_to = 127}]
+ },
+ {
+ field_name = smallint_col
+ field_type = smallint
+ field_value = [{equals_to = 32767}]
+ },
+ {
+ field_name = integer_col
+ field_type = int
+ field_value = [{equals_to = 3}]
+ },
+ {
+ field_name = bigint_col
+ field_type = long
+ field_value = [{equals_to = 1234567890}]
+ },
+ {
+ field_name = decimal_col
+ field_type = "decimal(22,4)"
+ field_value = [{equals_to = "55.0005"}]
+ },
+ {
+ field_name = real_col
+ field_type = float
+ field_value = [{equals_to = 67.89}]
+ },
+ {
+ field_name = double_col
+ field_type = double
+ field_value = [{equals_to = 123.45}]
+ },
+ {
+ field_name = char_col
+ field_type = string
+ field_value = [{equals_to = "8"}]
+ },
+ {
+ field_name = varchar_col
+ field_type = string
+ field_value = [{equals_to = "VarcharCol"}]
+ },
+ {
+ field_name = date_col
+ field_type = date
+ field_value = [{equals_to = "2024-01-01"}]
+ },
+ {
+ field_name = time_col
+ field_type = time
+ field_value = [{equals_to = "12:01:01"}]
+ },
+ {
+ field_name = timestamp_col
+ field_type = timestamp
+ field_value = [{equals_to = "2024-01-01T12:01:01"}]
+ },
+ {
+ field_name = varbinary_col
+ field_type = bytes
+ field_value = [{equals_to = "c3Ry"}]
+ },
+ {
+ field_name = json_col
+ field_type = string
+ field_value = [{equals_to = "{\"key\":\"val\"}"}]
+ }
+ ]
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
index 1be4cb4ca6..b43f35359a 100644
---
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
+++
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
@@ -340,7 +340,7 @@ public class SeaTunnelContainer extends
AbstractTestContainer {
} else {
// Waiting 10s for release thread
Awaitility.await()
- .atMost(30, TimeUnit.SECONDS)
+ .atMost(60, TimeUnit.SECONDS)
.untilAsserted(
() -> {
List<String> threads =
ContainerUtil.getJVMThreadNames(server);