This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 69f79af3a4 [Improve] Improve Jdbc connector error message when 
datatype unsupported (#5864)
69f79af3a4 is described below

commit 69f79af3a4ceb3f2d2d4db6fe0324e46011372d7
Author: Jia Fan <[email protected]>
AuthorDate: Mon Nov 20 16:41:56 2023 +0800

    [Improve] Improve Jdbc connector error message when datatype unsupported 
(#5864)
---
 .../seatunnel/common/exception/CommonError.java    |  35 ++++
 .../common/exception/CommonErrorCode.java          |  12 +-
 .../exception/SeaTunnelRuntimeException.java       |  24 +++
 .../jdbc/catalog/AbstractJdbcCatalog.java          |  24 ++-
 .../seatunnel/jdbc/catalog/utils/CatalogUtils.java |  39 +++-
 .../seatunnel/jdbc/utils/JdbcCatalogUtils.java     |  53 +++--
 .../connectors/seatunnel/jdbc/JdbcErrorIT.java     | 213 +++++++++++++++++++++
 7 files changed, 370 insertions(+), 30 deletions(-)

diff --git 
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java
 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java
index daa747255f..74d7c0efd3 100644
--- 
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java
+++ 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java
@@ -17,6 +17,9 @@
 
 package org.apache.seatunnel.common.exception;
 
+import 
org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
+
 import org.apache.seatunnel.common.constants.PluginType;
 
 import java.util.HashMap;
@@ -26,6 +29,8 @@ import static 
org.apache.seatunnel.common.exception.CommonErrorCode.CONVERT_TO_C
 import static 
org.apache.seatunnel.common.exception.CommonErrorCode.CONVERT_TO_CONNECTOR_TYPE_ERROR_SIMPLE;
 import static 
org.apache.seatunnel.common.exception.CommonErrorCode.CONVERT_TO_SEATUNNEL_TYPE_ERROR;
 import static 
org.apache.seatunnel.common.exception.CommonErrorCode.CONVERT_TO_SEATUNNEL_TYPE_ERROR_SIMPLE;
+import static 
org.apache.seatunnel.common.exception.CommonErrorCode.GET_CATALOG_TABLES_WITH_UNSUPPORTED_TYPE_ERROR;
+import static 
org.apache.seatunnel.common.exception.CommonErrorCode.GET_CATALOG_TABLE_WITH_UNSUPPORTED_TYPE_ERROR;
 import static 
org.apache.seatunnel.common.exception.CommonErrorCode.UNSUPPORTED_DATA_TYPE;
 
 /**
@@ -37,6 +42,8 @@ import static 
org.apache.seatunnel.common.exception.CommonErrorCode.UNSUPPORTED_
  */
 public class CommonError {
 
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
     public static SeaTunnelRuntimeException unsupportedDataType(
             String identifier, String dataType, String field) {
         Map<String, String> params = new HashMap<>();
@@ -83,4 +90,32 @@ public class CommonError {
         params.put("field", field);
         return new 
SeaTunnelRuntimeException(CONVERT_TO_CONNECTOR_TYPE_ERROR_SIMPLE, params);
     }
+
+    public static SeaTunnelRuntimeException getCatalogTableWithUnsupportedType(
+            String catalogName, String tableName, Map<String, String> 
fieldWithDataTypes) {
+        Map<String, String> params = new HashMap<>();
+        params.put("catalogName", catalogName);
+        params.put("tableName", tableName);
+        try {
+            params.put("fieldWithDataTypes", 
OBJECT_MAPPER.writeValueAsString(fieldWithDataTypes));
+        } catch (JsonProcessingException e) {
+            throw new RuntimeException(e);
+        }
+        return new 
SeaTunnelRuntimeException(GET_CATALOG_TABLE_WITH_UNSUPPORTED_TYPE_ERROR, 
params);
+    }
+
+    public static SeaTunnelRuntimeException 
getCatalogTablesWithUnsupportedType(
+            String catalogName, Map<String, Map<String, String>> 
tableUnsupportedTypes) {
+        Map<String, String> params = new HashMap<>();
+        params.put("catalogName", catalogName);
+        try {
+            params.put(
+                    "tableUnsupportedTypes",
+                    OBJECT_MAPPER.writeValueAsString(tableUnsupportedTypes));
+        } catch (JsonProcessingException e) {
+            throw new RuntimeException(e);
+        }
+        return new SeaTunnelRuntimeException(
+                GET_CATALOG_TABLES_WITH_UNSUPPORTED_TYPE_ERROR, params);
+    }
 }
diff --git 
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java
 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java
index 6824d8bbb9..a0817a4e72 100644
--- 
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java
+++ 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java
@@ -17,8 +17,8 @@
 
 package org.apache.seatunnel.common.exception;
 
-/** SeaTunnel connector error code interface, it only should be invoked by 
{@link CommonError} */
-enum CommonErrorCode implements SeaTunnelErrorCode {
+/** SeaTunnel connector error code interface */
+public enum CommonErrorCode implements SeaTunnelErrorCode {
     UNSUPPORTED_DATA_TYPE(
             "COMMON-07", "'<identifier>' unsupported data type '<dataType>' of 
'<field>'"),
     CONVERT_TO_SEATUNNEL_TYPE_ERROR(
@@ -32,7 +32,13 @@ enum CommonErrorCode implements SeaTunnelErrorCode {
             "'<connector>' <type> unsupported convert SeaTunnel data type 
'<dataType>' of '<field>' to connector data type."),
     CONVERT_TO_CONNECTOR_TYPE_ERROR_SIMPLE(
             "COMMON-19",
-            "'<identifier>' unsupported convert SeaTunnel data type 
'<dataType>' of '<field>' to connector data type.");
+            "'<identifier>' unsupported convert SeaTunnel data type 
'<dataType>' of '<field>' to connector data type."),
+    GET_CATALOG_TABLE_WITH_UNSUPPORTED_TYPE_ERROR(
+            "COMMON-20",
+            "'<catalogName>' table '<tableName>' unsupported get catalog table 
with field data types '<fieldWithDataTypes>'"),
+    GET_CATALOG_TABLES_WITH_UNSUPPORTED_TYPE_ERROR(
+            "COMMON-21",
+            "'<catalogName>' tables unsupported get catalog table,the 
corresponding field types in the following tables are not supported: 
'<tableUnsupportedTypes>'");
 
     private final String code;
     private final String description;
diff --git 
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/SeaTunnelRuntimeException.java
 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/SeaTunnelRuntimeException.java
index ff8743d68a..4f6f021522 100644
--- 
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/SeaTunnelRuntimeException.java
+++ 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/SeaTunnelRuntimeException.java
@@ -17,11 +17,18 @@
 
 package org.apache.seatunnel.common.exception;
 
+import 
org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonProcessingException;
+import 
org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
+
 import java.util.HashMap;
 import java.util.Map;
 
 /** SeaTunnel global exception, used to tell user more clearly error messages 
*/
 public class SeaTunnelRuntimeException extends RuntimeException {
+
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
     private final SeaTunnelErrorCode seaTunnelErrorCode;
     private final Map<String, String> params;
 
@@ -64,4 +71,21 @@ public class SeaTunnelRuntimeException extends 
RuntimeException {
     public Map<String, String> getParams() {
         return params;
     }
+
+    public Map<String, String> getParamsValueAsMap(String key) {
+        try {
+            return OBJECT_MAPPER.readValue(
+                    params.get(key), new TypeReference<Map<String, String>>() 
{});
+        } catch (JsonProcessingException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public <T> T getParamsValueAs(String key) {
+        try {
+            return OBJECT_MAPPER.readValue(params.get(key), new 
TypeReference<T>() {});
+        } catch (JsonProcessingException e) {
+            throw new RuntimeException(e);
+        }
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
index 3802a7a747..291333459e 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
@@ -31,6 +31,9 @@ import 
org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistExce
 import 
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
 import 
org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
 import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
+import org.apache.seatunnel.common.exception.CommonError;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
 import org.apache.seatunnel.common.utils.JdbcUrlUtil;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils;
 
@@ -49,6 +52,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -170,8 +174,23 @@ public abstract class AbstractJdbcCatalog implements 
Catalog {
                     ResultSet resultSet = ps.executeQuery()) {
 
                 TableSchema.Builder builder = TableSchema.builder();
+                Map<String, String> unsupported = new LinkedHashMap<>();
                 while (resultSet.next()) {
-                    builder.column(buildColumn(resultSet));
+                    try {
+                        builder.column(buildColumn(resultSet));
+                    } catch (SeaTunnelRuntimeException e) {
+                        if (e.getSeaTunnelErrorCode()
+                                
.equals(CommonErrorCode.CONVERT_TO_SEATUNNEL_TYPE_ERROR_SIMPLE)) {
+                            unsupported.put(
+                                    e.getParams().get("field"), 
e.getParams().get("dataType"));
+                        } else {
+                            throw e;
+                        }
+                    }
+                }
+                if (!unsupported.isEmpty()) {
+                    throw CommonError.getCatalogTableWithUnsupportedType(
+                            catalogName, tablePath.getFullName(), unsupported);
                 }
                 // add primary key
                 primaryKey.ifPresent(builder::primaryKey);
@@ -186,7 +205,8 @@ public abstract class AbstractJdbcCatalog implements 
Catalog {
                         "",
                         catalogName);
             }
-
+        } catch (SeaTunnelRuntimeException e) {
+            throw e;
         } catch (Exception e) {
             throw new CatalogException(
                     String.format("Failed getting table %s", 
tablePath.getFullName()), e);
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtils.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtils.java
index b14de7cd15..1f35098eea 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtils.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtils.java
@@ -24,6 +24,9 @@ import org.apache.seatunnel.api.table.catalog.PrimaryKey;
 import org.apache.seatunnel.api.table.catalog.TableIdentifier;
 import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.common.exception.CommonError;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum;
 
@@ -40,6 +43,7 @@ import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -201,7 +205,7 @@ public class CatalogUtils {
                 catalogName);
     }
 
-    public static CatalogTable getCatalogTable(ResultSetMetaData 
resultSetMetaData)
+    public static CatalogTable getCatalogTable(ResultSetMetaData 
resultSetMetaData, String sqlQuery)
             throws SQLException {
         return getCatalogTable(
                 resultSetMetaData,
@@ -212,11 +216,13 @@ public class CatalogUtils {
                             } catch (SQLException e) {
                                 throw new RuntimeException(e);
                             }
-                        });
+                        },
+                sqlQuery);
     }
 
     public static CatalogTable getCatalogTable(
-            ResultSetMetaData metadata, JdbcDialectTypeMapper typeMapper) 
throws SQLException {
+            ResultSetMetaData metadata, JdbcDialectTypeMapper typeMapper, 
String sqlQuery)
+            throws SQLException {
         return getCatalogTable(
                 metadata,
                 (BiFunction<ResultSetMetaData, Integer, Column>)
@@ -226,17 +232,32 @@ public class CatalogUtils {
                             } catch (SQLException e) {
                                 throw new RuntimeException(e);
                             }
-                        });
+                        },
+                sqlQuery);
     }
 
     public static CatalogTable getCatalogTable(
             ResultSetMetaData metadata,
-            BiFunction<ResultSetMetaData, Integer, Column> columnConverter)
+            BiFunction<ResultSetMetaData, Integer, Column> columnConverter,
+            String sqlQuery)
             throws SQLException {
         TableSchema.Builder schemaBuilder = TableSchema.builder();
+        Map<String, String> unsupported = new LinkedHashMap<>();
         for (int index = 1; index <= metadata.getColumnCount(); index++) {
-            Column column = columnConverter.apply(metadata, index);
-            schemaBuilder.column(column);
+            try {
+                Column column = columnConverter.apply(metadata, index);
+                schemaBuilder.column(column);
+            } catch (SeaTunnelRuntimeException e) {
+                if (e.getSeaTunnelErrorCode()
+                        
.equals(CommonErrorCode.CONVERT_TO_SEATUNNEL_TYPE_ERROR_SIMPLE)) {
+                    unsupported.put(e.getParams().get("field"), 
e.getParams().get("dataType"));
+                } else {
+                    throw e;
+                }
+            }
+        }
+        if (!unsupported.isEmpty()) {
+            throw CommonError.getCatalogTableWithUnsupportedType("UNKNOWN", 
sqlQuery, unsupported);
         }
         String catalogName = "jdbc_catalog";
         return CatalogTable.of(
@@ -252,7 +273,7 @@ public class CatalogUtils {
             Connection connection, String sqlQuery, JdbcDialectTypeMapper 
typeMapper)
             throws SQLException {
         try (PreparedStatement ps = connection.prepareStatement(sqlQuery)) {
-            return getCatalogTable(ps.getMetaData(), typeMapper);
+            return getCatalogTable(ps.getMetaData(), typeMapper, sqlQuery);
         }
     }
 
@@ -261,7 +282,7 @@ public class CatalogUtils {
         ResultSetMetaData resultSetMetaData;
         try (PreparedStatement ps = connection.prepareStatement(sqlQuery)) {
             resultSetMetaData = ps.getMetaData();
-            return getCatalogTable(resultSetMetaData);
+            return getCatalogTable(resultSetMetaData, sqlQuery);
         }
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
index 0760846701..a45f7ce7c5 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
@@ -27,6 +27,9 @@ import org.apache.seatunnel.api.table.catalog.TableIdentifier;
 import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.factory.FactoryUtil;
+import org.apache.seatunnel.common.exception.CommonError;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils;
@@ -73,22 +76,40 @@ public class JdbcCatalogUtils {
                 log.info("Loading catalog tables for catalog : {}", 
jdbcCatalog.getClass());
 
                 jdbcCatalog.open();
+                Map<String, Map<String, String>> unsupportedTable = new 
HashMap<>();
                 for (JdbcSourceTableConfig tableConfig : tablesConfig) {
-                    CatalogTable catalogTable =
-                            getCatalogTable(tableConfig, jdbcCatalog, 
jdbcDialect);
-                    TablePath tablePath = 
catalogTable.getTableId().toTablePath();
-                    JdbcSourceTable jdbcSourceTable =
-                            JdbcSourceTable.builder()
-                                    .tablePath(tablePath)
-                                    .query(tableConfig.getQuery())
-                                    
.partitionColumn(tableConfig.getPartitionColumn())
-                                    
.partitionNumber(tableConfig.getPartitionNumber())
-                                    
.partitionStart(tableConfig.getPartitionStart())
-                                    
.partitionEnd(tableConfig.getPartitionEnd())
-                                    .catalogTable(catalogTable)
-                                    .build();
-                    tables.put(tablePath, jdbcSourceTable);
-                    log.info("Loaded catalog table : {}, {}", tablePath, 
jdbcSourceTable);
+                    try {
+                        CatalogTable catalogTable =
+                                getCatalogTable(tableConfig, jdbcCatalog, 
jdbcDialect);
+                        TablePath tablePath = 
catalogTable.getTableId().toTablePath();
+                        JdbcSourceTable jdbcSourceTable =
+                                JdbcSourceTable.builder()
+                                        .tablePath(tablePath)
+                                        .query(tableConfig.getQuery())
+                                        
.partitionColumn(tableConfig.getPartitionColumn())
+                                        
.partitionNumber(tableConfig.getPartitionNumber())
+                                        
.partitionStart(tableConfig.getPartitionStart())
+                                        
.partitionEnd(tableConfig.getPartitionEnd())
+                                        .catalogTable(catalogTable)
+                                        .build();
+                        tables.put(tablePath, jdbcSourceTable);
+                        log.info("Loaded catalog table : {}, {}", tablePath, 
jdbcSourceTable);
+                    } catch (SeaTunnelRuntimeException e) {
+                        if (e.getSeaTunnelErrorCode()
+                                .equals(
+                                        CommonErrorCode
+                                                
.GET_CATALOG_TABLE_WITH_UNSUPPORTED_TYPE_ERROR)) {
+                            unsupportedTable.put(
+                                    e.getParams().get("tableName"),
+                                    
e.getParamsValueAsMap("fieldWithDataTypes"));
+                        } else {
+                            throw e;
+                        }
+                    }
+                }
+                if (!unsupportedTable.isEmpty()) {
+                    throw CommonError.getCatalogTablesWithUnsupportedType(
+                            jdbcDialect.dialectName(), unsupportedTable);
                 }
                 log.info(
                         "Loaded {} catalog tables for catalog : {}",
@@ -328,7 +349,7 @@ public class JdbcCatalogUtils {
         ResultSetMetaData resultSetMetaData =
                 jdbcDialect.getResultSetMetaData(connection, sqlQuery);
         return CatalogUtils.getCatalogTable(
-                resultSetMetaData, jdbcDialect.getJdbcDialectTypeMapper());
+                resultSetMetaData, jdbcDialect.getJdbcDialectTypeMapper(), 
sqlQuery);
     }
 
     private static Connection getConnection(JdbcConnectionConfig config, 
JdbcDialect jdbcDialect)
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcErrorIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcErrorIT.java
new file mode 100644
index 0000000000..fa30652b04
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcErrorIT.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceFactory;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.PostgreSQLContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import static org.awaitility.Awaitility.given;
+
+/**
+ * This test case is used to test that the jdbc connector returns the expected 
error when
+ * encountering an unsupported data type. If a certain type is supported and 
the test case becomes
+ * invalid, we need to find a replacement to allow the test case t o continue 
to be executed,
+ * instead of deleting it.
+ */
+@Slf4j
+public class JdbcErrorIT extends TestSuiteBase implements TestResource {
+    private static final String PG_IMAGE = "postgis/postgis";
+    private PostgreSQLContainer<?> POSTGRESQL_CONTAINER;
+    private static final String PG_SOURCE_DDL1 =
+            "CREATE TABLE IF NOT EXISTS pg_e2e_source_table1 (\n"
+                    + "  gid SERIAL PRIMARY KEY,"
+                    + " timearray1 timestamp[],"
+                    + " timearray2 timestamp[]\n"
+                    + ")";
+    private static final String PG_SOURCE_DDL2 =
+            "CREATE TABLE IF NOT EXISTS pg_e2e_source_table2 (\n"
+                    + "  gid SERIAL PRIMARY KEY,"
+                    + " str VARCHAR(255),"
+                    + " timearray2 timestamp[]\n"
+                    + ")";
+    private static final String PG_SOURCE_DDL3 =
+            "CREATE TABLE IF NOT EXISTS pg_e2e_source_table3 (\n"
+                    + "  gid SERIAL PRIMARY KEY,"
+                    + " str1 VARCHAR(255),"
+                    + " str2 VARCHAR(255)\n"
+                    + ")";
+
+    @BeforeAll
+    @Override
+    public void startUp() throws Exception {
+        POSTGRESQL_CONTAINER =
+                new PostgreSQLContainer<>(
+                                DockerImageName.parse(PG_IMAGE)
+                                        .asCompatibleSubstituteFor("postgres"))
+                        .withNetwork(TestSuiteBase.NETWORK)
+                        .withNetworkAliases("postgresql")
+                        .withCommand("postgres -c 
max_prepared_transactions=100")
+                        .withDatabaseName("seatunnel")
+                        .withLogConsumer(
+                                new 
Slf4jLogConsumer(DockerLoggerFactory.getLogger(PG_IMAGE)));
+        Startables.deepStart(Stream.of(POSTGRESQL_CONTAINER)).join();
+        log.info("PostgreSQL container started");
+        Class.forName(POSTGRESQL_CONTAINER.getDriverClassName());
+        given().ignoreExceptions()
+                .await()
+                .atLeast(100, TimeUnit.MILLISECONDS)
+                .pollInterval(500, TimeUnit.MILLISECONDS)
+                .atMost(2, TimeUnit.MINUTES)
+                .untilAsserted(this::initializeJdbcTable);
+        log.info("pg data initialization succeeded. Procedure");
+    }
+
+    @Test
+    void testThrowMultiTableAndFieldsInfoWhenDataTypeUnsupported() {
+        ReadonlyConfig config =
+                ReadonlyConfig.fromMap(
+                        new HashMap<String, Object>() {
+                            {
+                                put("url", POSTGRESQL_CONTAINER.getJdbcUrl());
+                                put("driver", "org.postgresql.Driver");
+                                put("user", 
POSTGRESQL_CONTAINER.getUsername());
+                                put("password", 
POSTGRESQL_CONTAINER.getPassword());
+                                put(
+                                        "table_list",
+                                        new ArrayList<Map<String, Object>>() {
+                                            {
+                                                add(
+                                                        new HashMap<String, 
Object>() {
+                                                            {
+                                                                put(
+                                                                        
"table_path",
+                                                                        
"seatunnel.public.pg_e2e_source_table1");
+                                                            }
+                                                        });
+                                                add(
+                                                        new HashMap<String, 
Object>() {
+                                                            {
+                                                                put(
+                                                                        
"table_path",
+                                                                        
"seatunnel.public.pg_e2e_source_table2");
+                                                                put(
+                                                                        
"query",
+                                                                        
"select * from seatunnel.public.pg_e2e_source_table2");
+                                                            }
+                                                        });
+                                                add(
+                                                        new HashMap<String, 
Object>() {
+                                                            {
+                                                                put(
+                                                                        
"table_path",
+                                                                        
"seatunnel.public.pg_e2e_source_table3");
+                                                            }
+                                                        });
+                                            }
+                                        });
+                            }
+                        });
+        TableSourceFactoryContext context =
+                new TableSourceFactoryContext(
+                        config, 
Thread.currentThread().getContextClassLoader());
+        SeaTunnelRuntimeException exception =
+                Assertions.assertThrows(
+                        SeaTunnelRuntimeException.class,
+                        () -> {
+                            SeaTunnelSource source =
+                                    new 
JdbcSourceFactory().createSource(context).createSource();
+                            source.getProducedCatalogTables();
+                        });
+        Assertions.assertEquals(
+                "ErrorCode:[COMMON-21], ErrorDescription:['Postgres' tables 
unsupported get catalog table,"
+                        + "the corresponding field types in the following 
tables are not supported:"
+                        + " 
'{\"seatunnel.public.pg_e2e_source_table1\":{\"timearray1\":\"_timestamp\",\"timearray2\":\"_timestamp\"},"
+                        + "\"select * from 
seatunnel.public.pg_e2e_source_table2\":{\"timearray2\":\"_timestamp\"}}']",
+                exception.getMessage());
+        Map<String, Map<String, String>> result = new LinkedHashMap<>();
+        result.put(
+                "seatunnel.public.pg_e2e_source_table1",
+                new HashMap<String, String>() {
+                    {
+                        put("timearray1", "_timestamp");
+                        put("timearray2", "_timestamp");
+                    }
+                });
+        result.put(
+                "select * from seatunnel.public.pg_e2e_source_table2",
+                new HashMap<String, String>() {
+                    {
+                        put("timearray2", "_timestamp");
+                    }
+                });
+        Assertions.assertEquals(result, 
exception.getParamsValueAs("tableUnsupportedTypes"));
+    }
+
+    private void initializeJdbcTable() {
+        try (Connection connection = getJdbcConnection()) {
+            Statement statement = connection.createStatement();
+            statement.execute(PG_SOURCE_DDL1);
+            statement.execute(PG_SOURCE_DDL2);
+            statement.execute(PG_SOURCE_DDL3);
+        } catch (SQLException e) {
+            throw new RuntimeException("Initializing PostgreSql table 
failed!", e);
+        }
+    }
+
+    private Connection getJdbcConnection() throws SQLException {
+        return DriverManager.getConnection(
+                POSTGRESQL_CONTAINER.getJdbcUrl(),
+                POSTGRESQL_CONTAINER.getUsername(),
+                POSTGRESQL_CONTAINER.getPassword());
+    }
+
+    @AfterAll
+    @Override
+    public void tearDown() {
+        if (POSTGRESQL_CONTAINER != null) {
+            POSTGRESQL_CONTAINER.stop();
+        }
+    }
+}

Reply via email to