This is an automated email from the ASF dual-hosted git repository.

zhouyao2023 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new b9791285a0 [Feature] Add unsupported datatype check for all catalog 
(#5890)
b9791285a0 is described below

commit b9791285a09fa247ce542a59423479282b23f1f9
Author: Jia Fan <[email protected]>
AuthorDate: Sun Dec 10 16:09:35 2023 +0800

    [Feature] Add unsupported datatype check for all catalog (#5890)
    
    * [Feature] Add unsupported datatype check for all catalog
    
    * update
    
    * update
---
 .../seatunnel/api/table/catalog/Catalog.java       | 76 ++++++++++++++++++----
 .../api/table/catalog/CatalogTableTest.java        | 54 +++++++++++++++
 .../api/table/catalog/InMemoryCatalog.java         | 48 +++++++++++++-
 .../catalog/ElasticSearchCatalog.java              | 30 +++++----
 .../jdbc/catalog/AbstractJdbcCatalog.java          | 46 ++++++++-----
 .../seatunnel/jdbc/catalog/dm/DamengCatalog.java   | 29 +++++++++
 .../seatunnel/jdbc/catalog/mysql/MySqlCatalog.java | 25 -------
 .../seatunnel/jdbc/utils/JdbcCatalogUtils.java     |  2 +-
 .../seatunnel/kudu/catalog/KuduCatalog.java        | 21 ++++--
 .../seatunnel/kudu/kuduclient/KuduTypeMapper.java  |  4 +-
 .../maxcompute/catalog/MaxComputeCatalog.java      |  5 ++
 .../starrocks/catalog/StarRocksCatalog.java        | 37 +++++++----
 12 files changed, 287 insertions(+), 90 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java
index 93c17c10fb..560fa98d3b 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java
@@ -24,13 +24,20 @@ import 
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistExceptio
 import 
org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
 import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
 import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.common.exception.CommonError;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
 
 import org.apache.commons.lang3.StringUtils;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
+import java.util.function.Function;
 import java.util.regex.Pattern;
 
 /**
@@ -58,6 +65,9 @@ public interface Catalog extends AutoCloseable {
      */
     void close() throws CatalogException;
 
+    /** Get the name of the catalog. */
+    String name();
+
     // 
--------------------------------------------------------------------------------------------
     // database
     // 
--------------------------------------------------------------------------------------------
@@ -124,15 +134,10 @@ public interface Catalog extends AutoCloseable {
     default List<CatalogTable> getTables(ReadonlyConfig config) throws 
CatalogException {
         // Get the list of specified tables
         List<String> tableNames = config.get(CatalogOptions.TABLE_NAMES);
-        List<CatalogTable> catalogTables = new ArrayList<>();
         if (tableNames != null && !tableNames.isEmpty()) {
-            for (String tableName : tableNames) {
-                TablePath tablePath = TablePath.of(tableName);
-                if (this.tableExists(tablePath)) {
-                    catalogTables.add(this.getTable(tablePath));
-                }
-            }
-            return catalogTables;
+            Iterator<TablePath> tablePaths =
+                    
tableNames.stream().map(TablePath::of).filter(this::tableExists).iterator();
+            return buildCatalogTablesWithErrorCheck(tablePaths);
         }
 
         // Get the list of table pattern
@@ -144,17 +149,66 @@ public interface Catalog extends AutoCloseable {
         Pattern tablePattern = 
Pattern.compile(config.get(CatalogOptions.TABLE_PATTERN));
         List<String> allDatabase = this.listDatabases();
         allDatabase.removeIf(s -> !databasePattern.matcher(s).matches());
+        List<TablePath> tablePaths = new ArrayList<>();
         for (String databaseName : allDatabase) {
             tableNames = this.listTables(databaseName);
-            for (String tableName : tableNames) {
-                if (tablePattern.matcher(databaseName + "." + 
tableName).matches()) {
-                    catalogTables.add(this.getTable(TablePath.of(databaseName, 
tableName)));
+            tableNames.forEach(
+                    tableName -> {
+                        if (tablePattern.matcher(databaseName + "." + 
tableName).matches()) {
+                            tablePaths.add(TablePath.of(databaseName, 
tableName));
+                        }
+                    });
+        }
+        return buildCatalogTablesWithErrorCheck(tablePaths.iterator());
+    }
+
+    default List<CatalogTable> 
buildCatalogTablesWithErrorCheck(Iterator<TablePath> tablePaths) {
+        Map<String, Map<String, String>> unsupportedTable = new 
LinkedHashMap<>();
+        List<CatalogTable> catalogTables = new ArrayList<>();
+        while (tablePaths.hasNext()) {
+            try {
+                catalogTables.add(getTable(tablePaths.next()));
+            } catch (SeaTunnelRuntimeException e) {
+                if (e.getSeaTunnelErrorCode()
+                        
.equals(CommonErrorCode.GET_CATALOG_TABLE_WITH_UNSUPPORTED_TYPE_ERROR)) {
+                    unsupportedTable.put(
+                            e.getParams().get("tableName"),
+                            e.getParamsValueAsMap("fieldWithDataTypes"));
+                } else {
+                    throw e;
                 }
             }
         }
+        if (!unsupportedTable.isEmpty()) {
+            throw CommonError.getCatalogTablesWithUnsupportedType(name(), 
unsupportedTable);
+        }
         return catalogTables;
     }
 
+    default <T> void buildColumnsWithErrorCheck(
+            TablePath tablePath,
+            TableSchema.Builder builder,
+            Iterator<T> keys,
+            Function<T, Column> getColumn) {
+        Map<String, String> unsupported = new LinkedHashMap<>();
+        while (keys.hasNext()) {
+            try {
+                builder.column(getColumn.apply(keys.next()));
+            } catch (SeaTunnelRuntimeException e) {
+                if (e.getSeaTunnelErrorCode()
+                        
.equals(CommonErrorCode.CONVERT_TO_SEATUNNEL_TYPE_ERROR_SIMPLE)) {
+                    unsupported.put(e.getParams().get("field"), 
e.getParams().get("dataType"));
+                } else {
+                    throw e;
+                }
+            }
+        }
+        if (!unsupported.isEmpty()) {
+            throw CommonError.getCatalogTableWithUnsupportedType(
+                    name(), tablePath.getFullName(), unsupported);
+        }
+    }
+
     /**
      * Create a new table in this catalog.
      *
diff --git 
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/CatalogTableTest.java
 
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/CatalogTableTest.java
index 6bb0890f21..d3c7692b60 100644
--- 
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/CatalogTableTest.java
+++ 
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/CatalogTableTest.java
@@ -17,9 +17,17 @@
 
 package org.apache.seatunnel.api.table.catalog;
 
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
 
 public class CatalogTableTest {
 
@@ -35,4 +43,50 @@ public class CatalogTableTest {
         catalogTable.getOptions().put("test", "value");
         catalogTable.getPartitionKeys().add("test");
     }
+
+    @Test
+    public void testReadCatalogTableWithUnsupportedType() {
+        Catalog catalog =
+                new InMemoryCatalogFactory()
+                        .createCatalog("InMemory", ReadonlyConfig.fromMap(new 
HashMap<>()));
+        SeaTunnelRuntimeException exception =
+                Assertions.assertThrows(
+                        SeaTunnelRuntimeException.class,
+                        () ->
+                                catalog.getTables(
+                                        ReadonlyConfig.fromMap(
+                                                new HashMap<String, Object>() {
+                                                    {
+                                                        put(
+                                                                
CatalogOptions.TABLE_NAMES.key(),
+                                                                Arrays.asList(
+                                                                        
"unsupported.public.table1",
+                                                                        
"unsupported.public.table2"));
+                                                    }
+                                                })));
+        Assertions.assertEquals(
+                "ErrorCode:[COMMON-21], ErrorDescription:['InMemory' tables 
unsupported get catalog table,"
+                        + "the corresponding field types in the following 
tables are not supported: '{\"unsupported.public.table1\""
+                        + 
":{\"field1\":\"interval\",\"field2\":\"interval2\"},\"unsupported.public.table2\":{\"field1\":\"interval\","
+                        + "\"field2\":\"interval2\"}}']",
+                exception.getMessage());
+        Map<String, Map<String, String>> result = new LinkedHashMap<>();
+        result.put(
+                "unsupported.public.table1",
+                new HashMap<String, String>() {
+                    {
+                        put("field1", "interval");
+                        put("field2", "interval2");
+                    }
+                });
+        result.put(
+                "unsupported.public.table2",
+                new HashMap<String, String>() {
+                    {
+                        put("field1", "interval");
+                        put("field2", "interval2");
+                    }
+                });
+        Assertions.assertEquals(result, 
exception.getParamsValueAs("tableUnsupportedTypes"));
+    }
 }
diff --git 
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/InMemoryCatalog.java
 
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/InMemoryCatalog.java
index 572955bda5..c745835abd 100644
--- 
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/InMemoryCatalog.java
+++ 
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/InMemoryCatalog.java
@@ -25,11 +25,15 @@ import 
org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistExcepti
 import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
 import org.apache.seatunnel.api.table.type.BasicType;
 import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.common.exception.CommonError;
+
+import org.apache.commons.lang3.tuple.Pair;
 
 import com.google.common.collect.Lists;
 import lombok.extern.slf4j.Slf4j;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -42,6 +46,7 @@ public class InMemoryCatalog implements Catalog {
     // database -> tables
     private final Map<String, List<CatalogTable>> catalogTables;
     private static final String DEFAULT_DATABASE = "default";
+    private static final String UNSUPPORTED_DATABASE = "unsupported";
 
     InMemoryCatalog(String catalogName, ReadonlyConfig options) {
         this.name = catalogName;
@@ -53,6 +58,7 @@ public class InMemoryCatalog implements Catalog {
     // Add some default table for testing
     private void addDefaultTable() {
         this.catalogTables.put(DEFAULT_DATABASE, new ArrayList<>());
+        this.catalogTables.put(UNSUPPORTED_DATABASE, new ArrayList<>());
         List<CatalogTable> tables = new ArrayList<>();
         this.catalogTables.put("st", tables);
         TableSchema tableSchema =
@@ -92,20 +98,40 @@ public class InMemoryCatalog implements Catalog {
         CatalogTable catalogTable1 =
                 CatalogTable.of(
                         TableIdentifier.of(name, TablePath.of("st", "public", 
"table1")),
-                        tableSchema,
+                        TableSchema.builder().build(),
                         new HashMap<>(),
                         new ArrayList<>(),
                         "In Memory Table");
         CatalogTable catalogTable2 =
                 CatalogTable.of(
                         TableIdentifier.of(name, TablePath.of("st", "public", 
"table2")),
-                        tableSchema,
+                        TableSchema.builder().build(),
                         new HashMap<>(),
                         new ArrayList<>(),
                         "In Memory Table",
                         name);
         tables.add(catalogTable1);
         tables.add(catalogTable2);
+
+        CatalogTable unsupportedTable1 =
+                CatalogTable.of(
+                        TableIdentifier.of(
+                                name, TablePath.of(UNSUPPORTED_DATABASE, 
"public", "table1")),
+                        tableSchema,
+                        new HashMap<>(),
+                        new ArrayList<>(),
+                        "In Memory Table");
+        CatalogTable unsupportedTable2 =
+                CatalogTable.of(
+                        TableIdentifier.of(
+                                name, TablePath.of(UNSUPPORTED_DATABASE, 
"public", "table2")),
+                        tableSchema,
+                        new HashMap<>(),
+                        new ArrayList<>(),
+                        "In Memory Table",
+                        name);
+        this.catalogTables.get(UNSUPPORTED_DATABASE).add(unsupportedTable1);
+        this.catalogTables.get(UNSUPPORTED_DATABASE).add(unsupportedTable2);
     }
 
     @Override
@@ -125,6 +151,11 @@ public class InMemoryCatalog implements Catalog {
         log.trace(String.format("InMemoryCatalog %s closing", name));
     }
 
+    @Override
+    public String name() {
+        return "InMemory";
+    }
+
     @Override
     public String getDefaultDatabase() throws CatalogException {
         return DEFAULT_DATABASE;
@@ -165,6 +196,19 @@ public class InMemoryCatalog implements Catalog {
     public CatalogTable getTable(TablePath tablePath)
             throws CatalogException, TableNotExistException {
         if (catalogTables.containsKey(tablePath.getDatabaseName())) {
+            if (tablePath.getDatabaseName().equals(UNSUPPORTED_DATABASE)) {
+                List<Pair<String, String>> unsupportedFields =
+                        Arrays.asList(
+                                Pair.of("field1", "interval"), 
Pair.of("field2", "interval2"));
+                buildColumnsWithErrorCheck(
+                        tablePath,
+                        new TableSchema.Builder(),
+                        unsupportedFields.iterator(),
+                        field -> {
+                            throw CommonError.convertToSeaTunnelTypeError(
+                                    name(), field.getValue(), field.getKey());
+                        });
+            }
             List<CatalogTable> tables = 
catalogTables.get(tablePath.getDatabaseName());
             return tables.stream()
                     .filter(t -> 
t.getTableId().toTablePath().equals(tablePath))
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java
index 8ca60925b0..1d5ba105f0 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java
@@ -91,6 +91,11 @@ public class ElasticSearchCatalog implements Catalog {
         esRestClient.close();
     }
 
+    @Override
+    public String name() {
+        return catalogName;
+    }
+
     @Override
     public String getDefaultDatabase() throws CatalogException {
         return defaultDatabase;
@@ -142,19 +147,20 @@ public class ElasticSearchCatalog implements Catalog {
         TableSchema.Builder builder = TableSchema.builder();
         Map<String, String> fieldTypeMapping =
                 esRestClient.getFieldTypeMapping(tablePath.getTableName(), 
Collections.emptyList());
-        fieldTypeMapping.forEach(
-                (fieldName, fieldType) -> {
+        buildColumnsWithErrorCheck(
+                tablePath,
+                builder,
+                fieldTypeMapping.entrySet().iterator(),
+                nameAndType -> {
                     // todo: we need to add a new type TEXT or add length in 
STRING type
-                    PhysicalColumn physicalColumn =
-                            PhysicalColumn.of(
-                                    fieldName,
-                                    
elasticSearchDataTypeConvertor.toSeaTunnelType(
-                                            fieldName, fieldType),
-                                    null,
-                                    true,
-                                    null,
-                                    null);
-                    builder.column(physicalColumn);
+                    return PhysicalColumn.of(
+                            nameAndType.getKey(),
+                            elasticSearchDataTypeConvertor.toSeaTunnelType(
+                                    nameAndType.getKey(), 
nameAndType.getValue()),
+                            null,
+                            true,
+                            null,
+                            null);
                 });
 
         return CatalogTable.of(
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
index 291333459e..f695cc30cb 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
@@ -99,6 +99,11 @@ public abstract class AbstractJdbcCatalog implements Catalog 
{
         this.connectionMap = new ConcurrentHashMap<>();
     }
 
+    @Override
+    public String name() {
+        return catalogName;
+    }
+
     @Override
     public String getDefaultDatabase() {
         return defaultDatabase;
@@ -174,24 +179,7 @@ public abstract class AbstractJdbcCatalog implements 
Catalog {
                     ResultSet resultSet = ps.executeQuery()) {
 
                 TableSchema.Builder builder = TableSchema.builder();
-                Map<String, String> unsupported = new LinkedHashMap<>();
-                while (resultSet.next()) {
-                    try {
-                        builder.column(buildColumn(resultSet));
-                    } catch (SeaTunnelRuntimeException e) {
-                        if (e.getSeaTunnelErrorCode()
-                                
.equals(CommonErrorCode.CONVERT_TO_SEATUNNEL_TYPE_ERROR_SIMPLE)) {
-                            unsupported.put(
-                                    e.getParams().get("field"), 
e.getParams().get("dataType"));
-                        } else {
-                            throw e;
-                        }
-                    }
-                }
-                if (!unsupported.isEmpty()) {
-                    throw CommonError.getCatalogTableWithUnsupportedType(
-                            catalogName, tablePath.getFullName(), unsupported);
-                }
+                buildColumnsWithErrorCheck(tablePath, resultSet, builder);
                 // add primary key
                 primaryKey.ifPresent(builder::primaryKey);
                 // add constraint key
@@ -213,6 +201,28 @@ public abstract class AbstractJdbcCatalog implements 
Catalog {
         }
     }
 
+    protected void buildColumnsWithErrorCheck(
+            TablePath tablePath, ResultSet resultSet, TableSchema.Builder 
builder)
+            throws SQLException {
+        Map<String, String> unsupported = new LinkedHashMap<>();
+        while (resultSet.next()) {
+            try {
+                builder.column(buildColumn(resultSet));
+            } catch (SeaTunnelRuntimeException e) {
+                if (e.getSeaTunnelErrorCode()
+                        
.equals(CommonErrorCode.CONVERT_TO_SEATUNNEL_TYPE_ERROR_SIMPLE)) {
+                    unsupported.put(e.getParams().get("field"), 
e.getParams().get("dataType"));
+                } else {
+                    throw e;
+                }
+            }
+        }
+        if (!unsupported.isEmpty()) {
+            throw CommonError.getCatalogTableWithUnsupportedType(
+                    catalogName, tablePath.getFullName(), unsupported);
+        }
+    }
+
     protected Optional<PrimaryKey> getPrimaryKey(DatabaseMetaData metaData, 
TablePath tablePath)
             throws SQLException {
         return getPrimaryKey(
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCatalog.java
index edf3e1b380..219067b68f 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCatalog.java
@@ -35,8 +35,10 @@ import org.apache.commons.lang3.StringUtils;
 import lombok.extern.slf4j.Slf4j;
 
 import java.sql.Connection;
+import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -177,6 +179,33 @@ public class DamengCatalog extends AbstractJdbcCatalog {
         return listTables(databases.get(0));
     }
 
+    @Override
+    public List<String> listTables(String databaseName)
+            throws CatalogException, DatabaseNotExistException {
+        if (!databaseExists(databaseName)) {
+            throw new DatabaseNotExistException(this.catalogName, 
databaseName);
+        }
+
+        try (PreparedStatement ps =
+                        getConnection(defaultUrl)
+                                .prepareStatement("SELECT OWNER, TABLE_NAME 
FROM ALL_TABLES");
+                ResultSet rs = ps.executeQuery()) {
+
+            List<String> tables = new ArrayList<>();
+            while (rs.next()) {
+                if (EXCLUDED_SCHEMAS.contains(rs.getString(1))) {
+                    continue;
+                }
+                tables.add(rs.getString(1) + "." + rs.getString(2));
+            }
+
+            return tables;
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format("Failed listing table in catalog %s", 
catalogName), e);
+        }
+    }
+
     @Override
     public CatalogTable getTable(String sqlQuery) throws SQLException {
         Connection defaultConnection = getConnection(defaultUrl);
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java
index 2ff7b399f3..e21ec7d3d7 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java
@@ -20,9 +20,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql;
 
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.Column;
-import org.apache.seatunnel.api.table.catalog.ConstraintKey;
 import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
-import org.apache.seatunnel.api.table.catalog.PrimaryKey;
 import org.apache.seatunnel.api.table.catalog.TableIdentifier;
 import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
@@ -36,14 +34,11 @@ import com.mysql.cj.MysqlType;
 import lombok.extern.slf4j.Slf4j;
 
 import java.sql.Connection;
-import java.sql.DatabaseMetaData;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Locale;
 import java.util.Map;
-import java.util.Optional;
 
 @Slf4j
 public class MySqlCatalog extends AbstractJdbcCatalog {
@@ -97,26 +92,6 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
                 catalogName, tablePath.getDatabaseName(), 
tablePath.getTableName());
     }
 
-    @Override
-    protected Optional<PrimaryKey> getPrimaryKey(DatabaseMetaData metaData, 
TablePath tablePath)
-            throws SQLException {
-        return getPrimaryKey(
-                metaData,
-                tablePath.getDatabaseName(),
-                tablePath.getTableName(),
-                tablePath.getTableName());
-    }
-
-    @Override
-    protected List<ConstraintKey> getConstraintKeys(DatabaseMetaData metaData, 
TablePath tablePath)
-            throws SQLException {
-        return getConstraintKeys(
-                metaData,
-                tablePath.getDatabaseName(),
-                tablePath.getTableName(),
-                tablePath.getTableName());
-    }
-
     @Override
     protected Column buildColumn(ResultSet resultSet) throws SQLException {
         String columnName = resultSet.getString("COLUMN_NAME");
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
index a45f7ce7c5..4d70b369c3 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
@@ -76,7 +76,7 @@ public class JdbcCatalogUtils {
                 log.info("Loading catalog tables for catalog : {}", 
jdbcCatalog.getClass());
 
                 jdbcCatalog.open();
-                Map<String, Map<String, String>> unsupportedTable = new 
HashMap<>();
+                Map<String, Map<String, String>> unsupportedTable = new 
LinkedHashMap<>();
                 for (JdbcSourceTableConfig tableConfig : tablesConfig) {
                     try {
                         CatalogTable catalogTable =
diff --git 
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/catalog/KuduCatalog.java
 
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/catalog/KuduCatalog.java
index 0fa40d8b0e..b5ba121fb5 100644
--- 
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/catalog/KuduCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/catalog/KuduCatalog.java
@@ -48,6 +48,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 import static 
org.apache.seatunnel.connectors.seatunnel.kudu.config.CommonConfig.ADMIN_OPERATION_TIMEOUT;
 import static 
org.apache.seatunnel.connectors.seatunnel.kudu.config.CommonConfig.ENABLE_KERBEROS;
@@ -89,6 +90,11 @@ public class KuduCatalog implements Catalog {
         }
     }
 
+    @Override
+    public String name() {
+        return catalogName;
+    }
+
     @Override
     public String getDefaultDatabase() throws CatalogException {
         return defaultDatabase;
@@ -143,17 +149,20 @@ public class KuduCatalog implements Catalog {
             kuduTable.getPartitionSchema();
             List<ColumnSchema> columnSchemaList = schema.getColumns();
             Optional<PrimaryKey> primaryKey = 
getPrimaryKey(schema.getPrimaryKeyColumns());
-            for (int i = 0; i < columnSchemaList.size(); i++) {
-                SeaTunnelDataType<?> type = 
KuduTypeMapper.mapping(columnSchemaList, i);
-                builder.column(
-                        PhysicalColumn.of(
+            buildColumnsWithErrorCheck(
+                    tablePath,
+                    builder,
+                    IntStream.range(0, columnSchemaList.size()).iterator(),
+                    i -> {
+                        SeaTunnelDataType<?> type = 
KuduTypeMapper.mapping(columnSchemaList, i);
+                        return PhysicalColumn.of(
                                 columnSchemaList.get(i).getName(),
                                 type,
                                 columnSchemaList.get(i).getTypeSize(),
                                 columnSchemaList.get(i).isNullable(),
                                 columnSchemaList.get(i).getDefaultValue(),
-                                columnSchemaList.get(i).getComment()));
-            }
+                                columnSchemaList.get(i).getComment());
+                    });
 
             primaryKey.ifPresent(builder::primaryKey);
 
diff --git 
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduTypeMapper.java
 
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduTypeMapper.java
index a088ed2107..7c8383c8ff 100644
--- 
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduTypeMapper.java
+++ 
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduTypeMapper.java
@@ -32,15 +32,13 @@ import org.apache.kudu.Type;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.sql.SQLException;
 import java.util.List;
 
 public class KuduTypeMapper {
 
     private static final Logger log = 
LoggerFactory.getLogger(KuduTypeMapper.class);
 
-    public static SeaTunnelDataType<?> mapping(List<ColumnSchema> 
columnSchemaList, int colIndex)
-            throws SQLException {
+    public static SeaTunnelDataType<?> mapping(List<ColumnSchema> 
columnSchemaList, int colIndex) {
         Type kuduType = columnSchemaList.get(colIndex).getType();
         switch (kuduType) {
             case BOOL:
diff --git 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalog.java
 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalog.java
index b131277bd7..6477eb2e36 100644
--- 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalog.java
@@ -66,6 +66,11 @@ public class MaxComputeCatalog implements Catalog {
     @Override
     public void close() throws CatalogException {}
 
+    @Override
+    public String name() {
+        return catalogName;
+    }
+
     @Override
     public String getDefaultDatabase() throws CatalogException {
         return readonlyConfig.get(PROJECT);
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
index 5caf035889..0c37322199 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
@@ -59,6 +59,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.stream.IntStream;
 
 import static 
org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkArgument;
 
@@ -167,18 +168,25 @@ public class StarRocksCatalog implements Catalog {
             ResultSetMetaData tableMetaData = ps.getMetaData();
 
             TableSchema.Builder builder = TableSchema.builder();
-            for (int i = 1; i <= tableMetaData.getColumnCount(); i++) {
-                SeaTunnelDataType<?> type = fromJdbcType(tableMetaData, i);
-                // TODO add default value and test it
-                builder.column(
-                        PhysicalColumn.of(
-                                tableMetaData.getColumnName(i),
-                                type,
-                                tableMetaData.getColumnDisplaySize(i),
-                                tableMetaData.isNullable(i) == 
ResultSetMetaData.columnNullable,
-                                null,
-                                tableMetaData.getColumnLabel(i)));
-            }
+            buildColumnsWithErrorCheck(
+                    tablePath,
+                    builder,
+                    IntStream.range(1, tableMetaData.getColumnCount() + 
1).iterator(),
+                    i -> {
+                        try {
+                            SeaTunnelDataType<?> type = 
fromJdbcType(tableMetaData, i);
+                            // TODO add default value and test it
+                            return PhysicalColumn.of(
+                                    tableMetaData.getColumnName(i),
+                                    type,
+                                    tableMetaData.getColumnDisplaySize(i),
+                                    tableMetaData.isNullable(i) == 
ResultSetMetaData.columnNullable,
+                                    null,
+                                    tableMetaData.getColumnLabel(i));
+                        } catch (SQLException e) {
+                            throw new RuntimeException(e);
+                        }
+                    });
 
             primaryKey.ifPresent(builder::primaryKey);
 
@@ -392,6 +400,11 @@ public class StarRocksCatalog implements Catalog {
         LOG.info("Catalog {} closing", catalogName);
     }
 
+    @Override
+    public String name() {
+        return catalogName;
+    }
+
     protected Optional<PrimaryKey> getPrimaryKey(String schema, String table) 
throws SQLException {
 
         List<String> pkFields = new ArrayList<>();


Reply via email to