This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 69f79af3a4 [Improve] Improve Jdbc connector error message when
datatype unsupported (#5864)
69f79af3a4 is described below
commit 69f79af3a4ceb3f2d2d4db6fe0324e46011372d7
Author: Jia Fan <[email protected]>
AuthorDate: Mon Nov 20 16:41:56 2023 +0800
[Improve] Improve Jdbc connector error message when datatype unsupported
(#5864)
---
.../seatunnel/common/exception/CommonError.java | 35 ++++
.../common/exception/CommonErrorCode.java | 12 +-
.../exception/SeaTunnelRuntimeException.java | 24 +++
.../jdbc/catalog/AbstractJdbcCatalog.java | 24 ++-
.../seatunnel/jdbc/catalog/utils/CatalogUtils.java | 39 +++-
.../seatunnel/jdbc/utils/JdbcCatalogUtils.java | 53 +++--
.../connectors/seatunnel/jdbc/JdbcErrorIT.java | 213 +++++++++++++++++++++
7 files changed, 370 insertions(+), 30 deletions(-)
diff --git
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java
index daa747255f..74d7c0efd3 100644
---
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java
+++
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java
@@ -17,6 +17,9 @@
package org.apache.seatunnel.common.exception;
+import
org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
+
import org.apache.seatunnel.common.constants.PluginType;
import java.util.HashMap;
@@ -26,6 +29,8 @@ import static
org.apache.seatunnel.common.exception.CommonErrorCode.CONVERT_TO_C
import static
org.apache.seatunnel.common.exception.CommonErrorCode.CONVERT_TO_CONNECTOR_TYPE_ERROR_SIMPLE;
import static
org.apache.seatunnel.common.exception.CommonErrorCode.CONVERT_TO_SEATUNNEL_TYPE_ERROR;
import static
org.apache.seatunnel.common.exception.CommonErrorCode.CONVERT_TO_SEATUNNEL_TYPE_ERROR_SIMPLE;
+import static
org.apache.seatunnel.common.exception.CommonErrorCode.GET_CATALOG_TABLES_WITH_UNSUPPORTED_TYPE_ERROR;
+import static
org.apache.seatunnel.common.exception.CommonErrorCode.GET_CATALOG_TABLE_WITH_UNSUPPORTED_TYPE_ERROR;
import static
org.apache.seatunnel.common.exception.CommonErrorCode.UNSUPPORTED_DATA_TYPE;
/**
@@ -37,6 +42,8 @@ import static
org.apache.seatunnel.common.exception.CommonErrorCode.UNSUPPORTED_
*/
public class CommonError {
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
public static SeaTunnelRuntimeException unsupportedDataType(
String identifier, String dataType, String field) {
Map<String, String> params = new HashMap<>();
@@ -83,4 +90,32 @@ public class CommonError {
params.put("field", field);
return new
SeaTunnelRuntimeException(CONVERT_TO_CONNECTOR_TYPE_ERROR_SIMPLE, params);
}
+
+ public static SeaTunnelRuntimeException getCatalogTableWithUnsupportedType(
+ String catalogName, String tableName, Map<String, String>
fieldWithDataTypes) {
+ Map<String, String> params = new HashMap<>();
+ params.put("catalogName", catalogName);
+ params.put("tableName", tableName);
+ try {
+ params.put("fieldWithDataTypes",
OBJECT_MAPPER.writeValueAsString(fieldWithDataTypes));
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ return new
SeaTunnelRuntimeException(GET_CATALOG_TABLE_WITH_UNSUPPORTED_TYPE_ERROR,
params);
+ }
+
+ public static SeaTunnelRuntimeException
getCatalogTablesWithUnsupportedType(
+ String catalogName, Map<String, Map<String, String>>
tableUnsupportedTypes) {
+ Map<String, String> params = new HashMap<>();
+ params.put("catalogName", catalogName);
+ try {
+ params.put(
+ "tableUnsupportedTypes",
+ OBJECT_MAPPER.writeValueAsString(tableUnsupportedTypes));
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ return new SeaTunnelRuntimeException(
+ GET_CATALOG_TABLES_WITH_UNSUPPORTED_TYPE_ERROR, params);
+ }
}
diff --git
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java
index 6824d8bbb9..a0817a4e72 100644
---
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java
+++
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java
@@ -17,8 +17,8 @@
package org.apache.seatunnel.common.exception;
-/** SeaTunnel connector error code interface, it only should be invoked by
{@link CommonError} */
-enum CommonErrorCode implements SeaTunnelErrorCode {
+/** SeaTunnel connector error code interface */
+public enum CommonErrorCode implements SeaTunnelErrorCode {
UNSUPPORTED_DATA_TYPE(
"COMMON-07", "'<identifier>' unsupported data type '<dataType>' of
'<field>'"),
CONVERT_TO_SEATUNNEL_TYPE_ERROR(
@@ -32,7 +32,13 @@ enum CommonErrorCode implements SeaTunnelErrorCode {
"'<connector>' <type> unsupported convert SeaTunnel data type
'<dataType>' of '<field>' to connector data type."),
CONVERT_TO_CONNECTOR_TYPE_ERROR_SIMPLE(
"COMMON-19",
- "'<identifier>' unsupported convert SeaTunnel data type
'<dataType>' of '<field>' to connector data type.");
+ "'<identifier>' unsupported convert SeaTunnel data type
'<dataType>' of '<field>' to connector data type."),
+ GET_CATALOG_TABLE_WITH_UNSUPPORTED_TYPE_ERROR(
+ "COMMON-20",
+ "'<catalogName>' table '<tableName>' unsupported get catalog table
with field data types '<fieldWithDataTypes>'"),
+ GET_CATALOG_TABLES_WITH_UNSUPPORTED_TYPE_ERROR(
+ "COMMON-21",
+ "'<catalogName>' tables unsupported get catalog table,the
corresponding field types in the following tables are not supported:
'<tableUnsupportedTypes>'");
private final String code;
private final String description;
diff --git
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/SeaTunnelRuntimeException.java
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/SeaTunnelRuntimeException.java
index ff8743d68a..4f6f021522 100644
---
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/SeaTunnelRuntimeException.java
+++
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/SeaTunnelRuntimeException.java
@@ -17,11 +17,18 @@
package org.apache.seatunnel.common.exception;
+import
org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonProcessingException;
+import
org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
+
import java.util.HashMap;
import java.util.Map;
/** SeaTunnel global exception, used to tell user more clearly error messages
*/
public class SeaTunnelRuntimeException extends RuntimeException {
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
private final SeaTunnelErrorCode seaTunnelErrorCode;
private final Map<String, String> params;
@@ -64,4 +71,21 @@ public class SeaTunnelRuntimeException extends
RuntimeException {
public Map<String, String> getParams() {
return params;
}
+
+ public Map<String, String> getParamsValueAsMap(String key) {
+ try {
+ return OBJECT_MAPPER.readValue(
+ params.get(key), new TypeReference<Map<String, String>>()
{});
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public <T> T getParamsValueAs(String key) {
+ try {
+ return OBJECT_MAPPER.readValue(params.get(key), new
TypeReference<T>() {});
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
index 3802a7a747..291333459e 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
@@ -31,6 +31,9 @@ import
org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistExce
import
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
import
org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
+import org.apache.seatunnel.common.exception.CommonError;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.apache.seatunnel.common.utils.JdbcUrlUtil;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils;
@@ -49,6 +52,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -170,8 +174,23 @@ public abstract class AbstractJdbcCatalog implements
Catalog {
ResultSet resultSet = ps.executeQuery()) {
TableSchema.Builder builder = TableSchema.builder();
+ Map<String, String> unsupported = new LinkedHashMap<>();
while (resultSet.next()) {
- builder.column(buildColumn(resultSet));
+ try {
+ builder.column(buildColumn(resultSet));
+ } catch (SeaTunnelRuntimeException e) {
+ if (e.getSeaTunnelErrorCode()
+
.equals(CommonErrorCode.CONVERT_TO_SEATUNNEL_TYPE_ERROR_SIMPLE)) {
+ unsupported.put(
+ e.getParams().get("field"),
e.getParams().get("dataType"));
+ } else {
+ throw e;
+ }
+ }
+ }
+ if (!unsupported.isEmpty()) {
+ throw CommonError.getCatalogTableWithUnsupportedType(
+ catalogName, tablePath.getFullName(), unsupported);
}
// add primary key
primaryKey.ifPresent(builder::primaryKey);
@@ -186,7 +205,8 @@ public abstract class AbstractJdbcCatalog implements
Catalog {
"",
catalogName);
}
-
+ } catch (SeaTunnelRuntimeException e) {
+ throw e;
} catch (Exception e) {
throw new CatalogException(
String.format("Failed getting table %s",
tablePath.getFullName()), e);
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtils.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtils.java
index b14de7cd15..1f35098eea 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtils.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtils.java
@@ -24,6 +24,9 @@ import org.apache.seatunnel.api.table.catalog.PrimaryKey;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.common.exception.CommonError;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum;
@@ -40,6 +43,7 @@ import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -201,7 +205,7 @@ public class CatalogUtils {
catalogName);
}
- public static CatalogTable getCatalogTable(ResultSetMetaData
resultSetMetaData)
+ public static CatalogTable getCatalogTable(ResultSetMetaData
resultSetMetaData, String sqlQuery)
throws SQLException {
return getCatalogTable(
resultSetMetaData,
@@ -212,11 +216,13 @@ public class CatalogUtils {
} catch (SQLException e) {
throw new RuntimeException(e);
}
- });
+ },
+ sqlQuery);
}
public static CatalogTable getCatalogTable(
- ResultSetMetaData metadata, JdbcDialectTypeMapper typeMapper)
throws SQLException {
+ ResultSetMetaData metadata, JdbcDialectTypeMapper typeMapper,
String sqlQuery)
+ throws SQLException {
return getCatalogTable(
metadata,
(BiFunction<ResultSetMetaData, Integer, Column>)
@@ -226,17 +232,32 @@ public class CatalogUtils {
} catch (SQLException e) {
throw new RuntimeException(e);
}
- });
+ },
+ sqlQuery);
}
public static CatalogTable getCatalogTable(
ResultSetMetaData metadata,
- BiFunction<ResultSetMetaData, Integer, Column> columnConverter)
+ BiFunction<ResultSetMetaData, Integer, Column> columnConverter,
+ String sqlQuery)
throws SQLException {
TableSchema.Builder schemaBuilder = TableSchema.builder();
+ Map<String, String> unsupported = new LinkedHashMap<>();
for (int index = 1; index <= metadata.getColumnCount(); index++) {
- Column column = columnConverter.apply(metadata, index);
- schemaBuilder.column(column);
+ try {
+ Column column = columnConverter.apply(metadata, index);
+ schemaBuilder.column(column);
+ } catch (SeaTunnelRuntimeException e) {
+ if (e.getSeaTunnelErrorCode()
+
.equals(CommonErrorCode.CONVERT_TO_SEATUNNEL_TYPE_ERROR_SIMPLE)) {
+ unsupported.put(e.getParams().get("field"),
e.getParams().get("dataType"));
+ } else {
+ throw e;
+ }
+ }
+ }
+ if (!unsupported.isEmpty()) {
+ throw CommonError.getCatalogTableWithUnsupportedType("UNKNOWN",
sqlQuery, unsupported);
}
String catalogName = "jdbc_catalog";
return CatalogTable.of(
@@ -252,7 +273,7 @@ public class CatalogUtils {
Connection connection, String sqlQuery, JdbcDialectTypeMapper
typeMapper)
throws SQLException {
try (PreparedStatement ps = connection.prepareStatement(sqlQuery)) {
- return getCatalogTable(ps.getMetaData(), typeMapper);
+ return getCatalogTable(ps.getMetaData(), typeMapper, sqlQuery);
}
}
@@ -261,7 +282,7 @@ public class CatalogUtils {
ResultSetMetaData resultSetMetaData;
try (PreparedStatement ps = connection.prepareStatement(sqlQuery)) {
resultSetMetaData = ps.getMetaData();
- return getCatalogTable(resultSetMetaData);
+ return getCatalogTable(resultSetMetaData, sqlQuery);
}
}
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
index 0760846701..a45f7ce7c5 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
@@ -27,6 +27,9 @@ import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.factory.FactoryUtil;
+import org.apache.seatunnel.common.exception.CommonError;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils;
@@ -73,22 +76,40 @@ public class JdbcCatalogUtils {
log.info("Loading catalog tables for catalog : {}",
jdbcCatalog.getClass());
jdbcCatalog.open();
+ Map<String, Map<String, String>> unsupportedTable = new
HashMap<>();
for (JdbcSourceTableConfig tableConfig : tablesConfig) {
- CatalogTable catalogTable =
- getCatalogTable(tableConfig, jdbcCatalog,
jdbcDialect);
- TablePath tablePath =
catalogTable.getTableId().toTablePath();
- JdbcSourceTable jdbcSourceTable =
- JdbcSourceTable.builder()
- .tablePath(tablePath)
- .query(tableConfig.getQuery())
-
.partitionColumn(tableConfig.getPartitionColumn())
-
.partitionNumber(tableConfig.getPartitionNumber())
-
.partitionStart(tableConfig.getPartitionStart())
-
.partitionEnd(tableConfig.getPartitionEnd())
- .catalogTable(catalogTable)
- .build();
- tables.put(tablePath, jdbcSourceTable);
- log.info("Loaded catalog table : {}, {}", tablePath,
jdbcSourceTable);
+ try {
+ CatalogTable catalogTable =
+ getCatalogTable(tableConfig, jdbcCatalog,
jdbcDialect);
+ TablePath tablePath =
catalogTable.getTableId().toTablePath();
+ JdbcSourceTable jdbcSourceTable =
+ JdbcSourceTable.builder()
+ .tablePath(tablePath)
+ .query(tableConfig.getQuery())
+
.partitionColumn(tableConfig.getPartitionColumn())
+
.partitionNumber(tableConfig.getPartitionNumber())
+
.partitionStart(tableConfig.getPartitionStart())
+
.partitionEnd(tableConfig.getPartitionEnd())
+ .catalogTable(catalogTable)
+ .build();
+ tables.put(tablePath, jdbcSourceTable);
+ log.info("Loaded catalog table : {}, {}", tablePath,
jdbcSourceTable);
+ } catch (SeaTunnelRuntimeException e) {
+ if (e.getSeaTunnelErrorCode()
+ .equals(
+ CommonErrorCode
+
.GET_CATALOG_TABLE_WITH_UNSUPPORTED_TYPE_ERROR)) {
+ unsupportedTable.put(
+ e.getParams().get("tableName"),
+
e.getParamsValueAsMap("fieldWithDataTypes"));
+ } else {
+ throw e;
+ }
+ }
+ }
+ if (!unsupportedTable.isEmpty()) {
+ throw CommonError.getCatalogTablesWithUnsupportedType(
+ jdbcDialect.dialectName(), unsupportedTable);
}
log.info(
"Loaded {} catalog tables for catalog : {}",
@@ -328,7 +349,7 @@ public class JdbcCatalogUtils {
ResultSetMetaData resultSetMetaData =
jdbcDialect.getResultSetMetaData(connection, sqlQuery);
return CatalogUtils.getCatalogTable(
- resultSetMetaData, jdbcDialect.getJdbcDialectTypeMapper());
+ resultSetMetaData, jdbcDialect.getJdbcDialectTypeMapper(),
sqlQuery);
}
private static Connection getConnection(JdbcConnectionConfig config,
JdbcDialect jdbcDialect)
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcErrorIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcErrorIT.java
new file mode 100644
index 0000000000..fa30652b04
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcErrorIT.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceFactory;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.PostgreSQLContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import static org.awaitility.Awaitility.given;
+
+/**
+ * This test case is used to test that the jdbc connector returns the expected
error when
+ * encountering an unsupported data type. If a certain type is supported and
the test case becomes
+ * invalid, we need to find a replacement to allow the test case t o continue
to be executed,
+ * instead of deleting it.
+ */
+@Slf4j
+public class JdbcErrorIT extends TestSuiteBase implements TestResource {
+ private static final String PG_IMAGE = "postgis/postgis";
+ private PostgreSQLContainer<?> POSTGRESQL_CONTAINER;
+ private static final String PG_SOURCE_DDL1 =
+ "CREATE TABLE IF NOT EXISTS pg_e2e_source_table1 (\n"
+ + " gid SERIAL PRIMARY KEY,"
+ + " timearray1 timestamp[],"
+ + " timearray2 timestamp[]\n"
+ + ")";
+ private static final String PG_SOURCE_DDL2 =
+ "CREATE TABLE IF NOT EXISTS pg_e2e_source_table2 (\n"
+ + " gid SERIAL PRIMARY KEY,"
+ + " str VARCHAR(255),"
+ + " timearray2 timestamp[]\n"
+ + ")";
+ private static final String PG_SOURCE_DDL3 =
+ "CREATE TABLE IF NOT EXISTS pg_e2e_source_table3 (\n"
+ + " gid SERIAL PRIMARY KEY,"
+ + " str1 VARCHAR(255),"
+ + " str2 VARCHAR(255)\n"
+ + ")";
+
+ @BeforeAll
+ @Override
+ public void startUp() throws Exception {
+ POSTGRESQL_CONTAINER =
+ new PostgreSQLContainer<>(
+ DockerImageName.parse(PG_IMAGE)
+ .asCompatibleSubstituteFor("postgres"))
+ .withNetwork(TestSuiteBase.NETWORK)
+ .withNetworkAliases("postgresql")
+ .withCommand("postgres -c
max_prepared_transactions=100")
+ .withDatabaseName("seatunnel")
+ .withLogConsumer(
+ new
Slf4jLogConsumer(DockerLoggerFactory.getLogger(PG_IMAGE)));
+ Startables.deepStart(Stream.of(POSTGRESQL_CONTAINER)).join();
+ log.info("PostgreSQL container started");
+ Class.forName(POSTGRESQL_CONTAINER.getDriverClassName());
+ given().ignoreExceptions()
+ .await()
+ .atLeast(100, TimeUnit.MILLISECONDS)
+ .pollInterval(500, TimeUnit.MILLISECONDS)
+ .atMost(2, TimeUnit.MINUTES)
+ .untilAsserted(this::initializeJdbcTable);
+ log.info("pg data initialization succeeded. Procedure");
+ }
+
+ @Test
+ void testThrowMultiTableAndFieldsInfoWhenDataTypeUnsupported() {
+ ReadonlyConfig config =
+ ReadonlyConfig.fromMap(
+ new HashMap<String, Object>() {
+ {
+ put("url", POSTGRESQL_CONTAINER.getJdbcUrl());
+ put("driver", "org.postgresql.Driver");
+ put("user",
POSTGRESQL_CONTAINER.getUsername());
+ put("password",
POSTGRESQL_CONTAINER.getPassword());
+ put(
+ "table_list",
+ new ArrayList<Map<String, Object>>() {
+ {
+ add(
+ new HashMap<String,
Object>() {
+ {
+ put(
+
"table_path",
+
"seatunnel.public.pg_e2e_source_table1");
+ }
+ });
+ add(
+ new HashMap<String,
Object>() {
+ {
+ put(
+
"table_path",
+
"seatunnel.public.pg_e2e_source_table2");
+ put(
+
"query",
+
"select * from seatunnel.public.pg_e2e_source_table2");
+ }
+ });
+ add(
+ new HashMap<String,
Object>() {
+ {
+ put(
+
"table_path",
+
"seatunnel.public.pg_e2e_source_table3");
+ }
+ });
+ }
+ });
+ }
+ });
+ TableSourceFactoryContext context =
+ new TableSourceFactoryContext(
+ config,
Thread.currentThread().getContextClassLoader());
+ SeaTunnelRuntimeException exception =
+ Assertions.assertThrows(
+ SeaTunnelRuntimeException.class,
+ () -> {
+ SeaTunnelSource source =
+ new
JdbcSourceFactory().createSource(context).createSource();
+ source.getProducedCatalogTables();
+ });
+ Assertions.assertEquals(
+ "ErrorCode:[COMMON-21], ErrorDescription:['Postgres' tables
unsupported get catalog table,"
+ + "the corresponding field types in the following
tables are not supported:"
+ + "
'{\"seatunnel.public.pg_e2e_source_table1\":{\"timearray1\":\"_timestamp\",\"timearray2\":\"_timestamp\"},"
+ + "\"select * from
seatunnel.public.pg_e2e_source_table2\":{\"timearray2\":\"_timestamp\"}}']",
+ exception.getMessage());
+ Map<String, Map<String, String>> result = new LinkedHashMap<>();
+ result.put(
+ "seatunnel.public.pg_e2e_source_table1",
+ new HashMap<String, String>() {
+ {
+ put("timearray1", "_timestamp");
+ put("timearray2", "_timestamp");
+ }
+ });
+ result.put(
+ "select * from seatunnel.public.pg_e2e_source_table2",
+ new HashMap<String, String>() {
+ {
+ put("timearray2", "_timestamp");
+ }
+ });
+ Assertions.assertEquals(result,
exception.getParamsValueAs("tableUnsupportedTypes"));
+ }
+
+ private void initializeJdbcTable() {
+ try (Connection connection = getJdbcConnection()) {
+ Statement statement = connection.createStatement();
+ statement.execute(PG_SOURCE_DDL1);
+ statement.execute(PG_SOURCE_DDL2);
+ statement.execute(PG_SOURCE_DDL3);
+ } catch (SQLException e) {
+ throw new RuntimeException("Initializing PostgreSql table
failed!", e);
+ }
+ }
+
+ private Connection getJdbcConnection() throws SQLException {
+ return DriverManager.getConnection(
+ POSTGRESQL_CONTAINER.getJdbcUrl(),
+ POSTGRESQL_CONTAINER.getUsername(),
+ POSTGRESQL_CONTAINER.getPassword());
+ }
+
+ @AfterAll
+ @Override
+ public void tearDown() {
+ if (POSTGRESQL_CONTAINER != null) {
+ POSTGRESQL_CONTAINER.stop();
+ }
+ }
+}