This is an automated email from the ASF dual-hosted git repository.
zhouyao2023 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new b9791285a0 [Feature] Add unsupported datatype check for all catalog
(#5890)
b9791285a0 is described below
commit b9791285a09fa247ce542a59423479282b23f1f9
Author: Jia Fan <[email protected]>
AuthorDate: Sun Dec 10 16:09:35 2023 +0800
[Feature] Add unsupported datatype check for all catalog (#5890)
* [Feature] Add unsupported datatype check for all catalog
* update
* update
---
.../seatunnel/api/table/catalog/Catalog.java | 76 ++++++++++++++++++----
.../api/table/catalog/CatalogTableTest.java | 54 +++++++++++++++
.../api/table/catalog/InMemoryCatalog.java | 48 +++++++++++++-
.../catalog/ElasticSearchCatalog.java | 30 +++++----
.../jdbc/catalog/AbstractJdbcCatalog.java | 46 ++++++++-----
.../seatunnel/jdbc/catalog/dm/DamengCatalog.java | 29 +++++++++
.../seatunnel/jdbc/catalog/mysql/MySqlCatalog.java | 25 -------
.../seatunnel/jdbc/utils/JdbcCatalogUtils.java | 2 +-
.../seatunnel/kudu/catalog/KuduCatalog.java | 21 ++++--
.../seatunnel/kudu/kuduclient/KuduTypeMapper.java | 4 +-
.../maxcompute/catalog/MaxComputeCatalog.java | 5 ++
.../starrocks/catalog/StarRocksCatalog.java | 37 +++++++----
12 files changed, 287 insertions(+), 90 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java
index 93c17c10fb..560fa98d3b 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java
@@ -24,13 +24,20 @@ import
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistExceptio
import
org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.common.exception.CommonError;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
+import java.util.function.Function;
import java.util.regex.Pattern;
/**
@@ -58,6 +65,9 @@ public interface Catalog extends AutoCloseable {
*/
void close() throws CatalogException;
+ /** Get the name of the catalog. */
+ String name();
+
//
--------------------------------------------------------------------------------------------
// database
//
--------------------------------------------------------------------------------------------
@@ -124,15 +134,10 @@ public interface Catalog extends AutoCloseable {
default List<CatalogTable> getTables(ReadonlyConfig config) throws
CatalogException {
// Get the list of specified tables
List<String> tableNames = config.get(CatalogOptions.TABLE_NAMES);
- List<CatalogTable> catalogTables = new ArrayList<>();
if (tableNames != null && !tableNames.isEmpty()) {
- for (String tableName : tableNames) {
- TablePath tablePath = TablePath.of(tableName);
- if (this.tableExists(tablePath)) {
- catalogTables.add(this.getTable(tablePath));
- }
- }
- return catalogTables;
+ Iterator<TablePath> tablePaths =
+
tableNames.stream().map(TablePath::of).filter(this::tableExists).iterator();
+ return buildCatalogTablesWithErrorCheck(tablePaths);
}
// Get the list of table pattern
@@ -144,17 +149,66 @@ public interface Catalog extends AutoCloseable {
Pattern tablePattern =
Pattern.compile(config.get(CatalogOptions.TABLE_PATTERN));
List<String> allDatabase = this.listDatabases();
allDatabase.removeIf(s -> !databasePattern.matcher(s).matches());
+ List<TablePath> tablePaths = new ArrayList<>();
for (String databaseName : allDatabase) {
tableNames = this.listTables(databaseName);
- for (String tableName : tableNames) {
- if (tablePattern.matcher(databaseName + "." +
tableName).matches()) {
- catalogTables.add(this.getTable(TablePath.of(databaseName,
tableName)));
+ tableNames.forEach(
+ tableName -> {
+ if (tablePattern.matcher(databaseName + "." +
tableName).matches()) {
+ tablePaths.add(TablePath.of(databaseName,
tableName));
+ }
+ });
+ }
+ return buildCatalogTablesWithErrorCheck(tablePaths.iterator());
+ }
+
+ default List<CatalogTable>
buildCatalogTablesWithErrorCheck(Iterator<TablePath> tablePaths) {
+ Map<String, Map<String, String>> unsupportedTable = new
LinkedHashMap<>();
+ List<CatalogTable> catalogTables = new ArrayList<>();
+ while (tablePaths.hasNext()) {
+ try {
+ catalogTables.add(getTable(tablePaths.next()));
+ } catch (SeaTunnelRuntimeException e) {
+ if (e.getSeaTunnelErrorCode()
+
.equals(CommonErrorCode.GET_CATALOG_TABLE_WITH_UNSUPPORTED_TYPE_ERROR)) {
+ unsupportedTable.put(
+ e.getParams().get("tableName"),
+ e.getParamsValueAsMap("fieldWithDataTypes"));
+ } else {
+ throw e;
}
}
}
+ if (!unsupportedTable.isEmpty()) {
+ throw CommonError.getCatalogTablesWithUnsupportedType(name(),
unsupportedTable);
+ }
return catalogTables;
}
+ default <T> void buildColumnsWithErrorCheck(
+ TablePath tablePath,
+ TableSchema.Builder builder,
+ Iterator<T> keys,
+ Function<T, Column> getColumn) {
+ Map<String, String> unsupported = new LinkedHashMap<>();
+ while (keys.hasNext()) {
+ try {
+ builder.column(getColumn.apply(keys.next()));
+ } catch (SeaTunnelRuntimeException e) {
+ if (e.getSeaTunnelErrorCode()
+
.equals(CommonErrorCode.CONVERT_TO_SEATUNNEL_TYPE_ERROR_SIMPLE)) {
+ unsupported.put(e.getParams().get("field"),
e.getParams().get("dataType"));
+ } else {
+ throw e;
+ }
+ }
+ }
+ if (!unsupported.isEmpty()) {
+ throw CommonError.getCatalogTableWithUnsupportedType(
+ name(), tablePath.getFullName(), unsupported);
+ }
+ }
+
/**
* Create a new table in this catalog.
*
diff --git
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/CatalogTableTest.java
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/CatalogTableTest.java
index 6bb0890f21..d3c7692b60 100644
---
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/CatalogTableTest.java
+++
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/CatalogTableTest.java
@@ -17,9 +17,17 @@
package org.apache.seatunnel.api.table.catalog;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
public class CatalogTableTest {
@@ -35,4 +43,50 @@ public class CatalogTableTest {
catalogTable.getOptions().put("test", "value");
catalogTable.getPartitionKeys().add("test");
}
+
+ @Test
+ public void testReadCatalogTableWithUnsupportedType() {
+ Catalog catalog =
+ new InMemoryCatalogFactory()
+ .createCatalog("InMemory", ReadonlyConfig.fromMap(new
HashMap<>()));
+ SeaTunnelRuntimeException exception =
+ Assertions.assertThrows(
+ SeaTunnelRuntimeException.class,
+ () ->
+ catalog.getTables(
+ ReadonlyConfig.fromMap(
+ new HashMap<String, Object>() {
+ {
+ put(
+
CatalogOptions.TABLE_NAMES.key(),
+ Arrays.asList(
+
"unsupported.public.table1",
+
"unsupported.public.table2"));
+ }
+ })));
+ Assertions.assertEquals(
+ "ErrorCode:[COMMON-21], ErrorDescription:['InMemory' tables
unsupported get catalog tableļ¼"
+ + "the corresponding field types in the following
tables are not supported: '{\"unsupported.public.table1\""
+ +
":{\"field1\":\"interval\",\"field2\":\"interval2\"},\"unsupported.public.table2\":{\"field1\":\"interval\","
+ + "\"field2\":\"interval2\"}}']",
+ exception.getMessage());
+ Map<String, Map<String, String>> result = new LinkedHashMap<>();
+ result.put(
+ "unsupported.public.table1",
+ new HashMap<String, String>() {
+ {
+ put("field1", "interval");
+ put("field2", "interval2");
+ }
+ });
+ result.put(
+ "unsupported.public.table2",
+ new HashMap<String, String>() {
+ {
+ put("field1", "interval");
+ put("field2", "interval2");
+ }
+ });
+ Assertions.assertEquals(result,
exception.getParamsValueAs("tableUnsupportedTypes"));
+ }
}
diff --git
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/InMemoryCatalog.java
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/InMemoryCatalog.java
index 572955bda5..c745835abd 100644
---
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/InMemoryCatalog.java
+++
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/InMemoryCatalog.java
@@ -25,11 +25,15 @@ import
org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistExcepti
import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.common.exception.CommonError;
+
+import org.apache.commons.lang3.tuple.Pair;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -42,6 +46,7 @@ public class InMemoryCatalog implements Catalog {
// database -> tables
private final Map<String, List<CatalogTable>> catalogTables;
private static final String DEFAULT_DATABASE = "default";
+ private static final String UNSUPPORTED_DATABASE = "unsupported";
InMemoryCatalog(String catalogName, ReadonlyConfig options) {
this.name = catalogName;
@@ -53,6 +58,7 @@ public class InMemoryCatalog implements Catalog {
// Add some default table for testing
private void addDefaultTable() {
this.catalogTables.put(DEFAULT_DATABASE, new ArrayList<>());
+ this.catalogTables.put(UNSUPPORTED_DATABASE, new ArrayList<>());
List<CatalogTable> tables = new ArrayList<>();
this.catalogTables.put("st", tables);
TableSchema tableSchema =
@@ -92,20 +98,40 @@ public class InMemoryCatalog implements Catalog {
CatalogTable catalogTable1 =
CatalogTable.of(
TableIdentifier.of(name, TablePath.of("st", "public",
"table1")),
- tableSchema,
+ TableSchema.builder().build(),
new HashMap<>(),
new ArrayList<>(),
"In Memory Table");
CatalogTable catalogTable2 =
CatalogTable.of(
TableIdentifier.of(name, TablePath.of("st", "public",
"table2")),
- tableSchema,
+ TableSchema.builder().build(),
new HashMap<>(),
new ArrayList<>(),
"In Memory Table",
name);
tables.add(catalogTable1);
tables.add(catalogTable2);
+
+ CatalogTable unsupportedTable1 =
+ CatalogTable.of(
+ TableIdentifier.of(
+ name, TablePath.of(UNSUPPORTED_DATABASE,
"public", "table1")),
+ tableSchema,
+ new HashMap<>(),
+ new ArrayList<>(),
+ "In Memory Table");
+ CatalogTable unsupportedTable2 =
+ CatalogTable.of(
+ TableIdentifier.of(
+ name, TablePath.of(UNSUPPORTED_DATABASE,
"public", "table2")),
+ tableSchema,
+ new HashMap<>(),
+ new ArrayList<>(),
+ "In Memory Table",
+ name);
+ this.catalogTables.get(UNSUPPORTED_DATABASE).add(unsupportedTable1);
+ this.catalogTables.get(UNSUPPORTED_DATABASE).add(unsupportedTable2);
}
@Override
@@ -125,6 +151,11 @@ public class InMemoryCatalog implements Catalog {
log.trace(String.format("InMemoryCatalog %s closing", name));
}
+ @Override
+ public String name() {
+ return "InMemory";
+ }
+
@Override
public String getDefaultDatabase() throws CatalogException {
return DEFAULT_DATABASE;
@@ -165,6 +196,19 @@ public class InMemoryCatalog implements Catalog {
public CatalogTable getTable(TablePath tablePath)
throws CatalogException, TableNotExistException {
if (catalogTables.containsKey(tablePath.getDatabaseName())) {
+ if (tablePath.getDatabaseName().equals(UNSUPPORTED_DATABASE)) {
+ List<Pair<String, String>> unsupportedFields =
+ Arrays.asList(
+ Pair.of("field1", "interval"),
Pair.of("field2", "interval2"));
+ buildColumnsWithErrorCheck(
+ tablePath,
+ new TableSchema.Builder(),
+ unsupportedFields.iterator(),
+ field -> {
+ throw CommonError.convertToSeaTunnelTypeError(
+ name(), field.getValue(), field.getKey());
+ });
+ }
List<CatalogTable> tables =
catalogTables.get(tablePath.getDatabaseName());
return tables.stream()
.filter(t ->
t.getTableId().toTablePath().equals(tablePath))
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java
index 8ca60925b0..1d5ba105f0 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java
@@ -91,6 +91,11 @@ public class ElasticSearchCatalog implements Catalog {
esRestClient.close();
}
+ @Override
+ public String name() {
+ return catalogName;
+ }
+
@Override
public String getDefaultDatabase() throws CatalogException {
return defaultDatabase;
@@ -142,19 +147,20 @@ public class ElasticSearchCatalog implements Catalog {
TableSchema.Builder builder = TableSchema.builder();
Map<String, String> fieldTypeMapping =
esRestClient.getFieldTypeMapping(tablePath.getTableName(),
Collections.emptyList());
- fieldTypeMapping.forEach(
- (fieldName, fieldType) -> {
+ buildColumnsWithErrorCheck(
+ tablePath,
+ builder,
+ fieldTypeMapping.entrySet().iterator(),
+ nameAndType -> {
// todo: we need to add a new type TEXT or add length in
STRING type
- PhysicalColumn physicalColumn =
- PhysicalColumn.of(
- fieldName,
-
elasticSearchDataTypeConvertor.toSeaTunnelType(
- fieldName, fieldType),
- null,
- true,
- null,
- null);
- builder.column(physicalColumn);
+ return PhysicalColumn.of(
+ nameAndType.getKey(),
+ elasticSearchDataTypeConvertor.toSeaTunnelType(
+ nameAndType.getKey(),
nameAndType.getValue()),
+ null,
+ true,
+ null,
+ null);
});
return CatalogTable.of(
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
index 291333459e..f695cc30cb 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
@@ -99,6 +99,11 @@ public abstract class AbstractJdbcCatalog implements Catalog
{
this.connectionMap = new ConcurrentHashMap<>();
}
+ @Override
+ public String name() {
+ return catalogName;
+ }
+
@Override
public String getDefaultDatabase() {
return defaultDatabase;
@@ -174,24 +179,7 @@ public abstract class AbstractJdbcCatalog implements
Catalog {
ResultSet resultSet = ps.executeQuery()) {
TableSchema.Builder builder = TableSchema.builder();
- Map<String, String> unsupported = new LinkedHashMap<>();
- while (resultSet.next()) {
- try {
- builder.column(buildColumn(resultSet));
- } catch (SeaTunnelRuntimeException e) {
- if (e.getSeaTunnelErrorCode()
-
.equals(CommonErrorCode.CONVERT_TO_SEATUNNEL_TYPE_ERROR_SIMPLE)) {
- unsupported.put(
- e.getParams().get("field"),
e.getParams().get("dataType"));
- } else {
- throw e;
- }
- }
- }
- if (!unsupported.isEmpty()) {
- throw CommonError.getCatalogTableWithUnsupportedType(
- catalogName, tablePath.getFullName(), unsupported);
- }
+ buildColumnsWithErrorCheck(tablePath, resultSet, builder);
// add primary key
primaryKey.ifPresent(builder::primaryKey);
// add constraint key
@@ -213,6 +201,28 @@ public abstract class AbstractJdbcCatalog implements
Catalog {
}
}
+ protected void buildColumnsWithErrorCheck(
+ TablePath tablePath, ResultSet resultSet, TableSchema.Builder
builder)
+ throws SQLException {
+ Map<String, String> unsupported = new LinkedHashMap<>();
+ while (resultSet.next()) {
+ try {
+ builder.column(buildColumn(resultSet));
+ } catch (SeaTunnelRuntimeException e) {
+ if (e.getSeaTunnelErrorCode()
+
.equals(CommonErrorCode.CONVERT_TO_SEATUNNEL_TYPE_ERROR_SIMPLE)) {
+ unsupported.put(e.getParams().get("field"),
e.getParams().get("dataType"));
+ } else {
+ throw e;
+ }
+ }
+ }
+ if (!unsupported.isEmpty()) {
+ throw CommonError.getCatalogTableWithUnsupportedType(
+ catalogName, tablePath.getFullName(), unsupported);
+ }
+ }
+
protected Optional<PrimaryKey> getPrimaryKey(DatabaseMetaData metaData,
TablePath tablePath)
throws SQLException {
return getPrimaryKey(
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCatalog.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCatalog.java
index edf3e1b380..219067b68f 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCatalog.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCatalog.java
@@ -35,8 +35,10 @@ import org.apache.commons.lang3.StringUtils;
import lombok.extern.slf4j.Slf4j;
import java.sql.Connection;
+import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -177,6 +179,33 @@ public class DamengCatalog extends AbstractJdbcCatalog {
return listTables(databases.get(0));
}
+ @Override
+ public List<String> listTables(String databaseName)
+ throws CatalogException, DatabaseNotExistException {
+ if (!databaseExists(databaseName)) {
+ throw new DatabaseNotExistException(this.catalogName,
databaseName);
+ }
+
+ try (PreparedStatement ps =
+ getConnection(defaultUrl)
+ .prepareStatement("SELECT OWNER, TABLE_NAME
FROM ALL_TABLES");
+ ResultSet rs = ps.executeQuery()) {
+
+ List<String> tables = new ArrayList<>();
+ while (rs.next()) {
+ if (EXCLUDED_SCHEMAS.contains(rs.getString(1))) {
+ continue;
+ }
+ tables.add(rs.getString(1) + "." + rs.getString(2));
+ }
+
+ return tables;
+ } catch (Exception e) {
+ throw new CatalogException(
+ String.format("Failed listing table in catalog %s",
catalogName), e);
+ }
+ }
+
@Override
public CatalogTable getTable(String sqlQuery) throws SQLException {
Connection defaultConnection = getConnection(defaultUrl);
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java
index 2ff7b399f3..e21ec7d3d7 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java
@@ -20,9 +20,7 @@ package
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.Column;
-import org.apache.seatunnel.api.table.catalog.ConstraintKey;
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
-import org.apache.seatunnel.api.table.catalog.PrimaryKey;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
@@ -36,14 +34,11 @@ import com.mysql.cj.MysqlType;
import lombok.extern.slf4j.Slf4j;
import java.sql.Connection;
-import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.HashMap;
-import java.util.List;
import java.util.Locale;
import java.util.Map;
-import java.util.Optional;
@Slf4j
public class MySqlCatalog extends AbstractJdbcCatalog {
@@ -97,26 +92,6 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
catalogName, tablePath.getDatabaseName(),
tablePath.getTableName());
}
- @Override
- protected Optional<PrimaryKey> getPrimaryKey(DatabaseMetaData metaData,
TablePath tablePath)
- throws SQLException {
- return getPrimaryKey(
- metaData,
- tablePath.getDatabaseName(),
- tablePath.getTableName(),
- tablePath.getTableName());
- }
-
- @Override
- protected List<ConstraintKey> getConstraintKeys(DatabaseMetaData metaData,
TablePath tablePath)
- throws SQLException {
- return getConstraintKeys(
- metaData,
- tablePath.getDatabaseName(),
- tablePath.getTableName(),
- tablePath.getTableName());
- }
-
@Override
protected Column buildColumn(ResultSet resultSet) throws SQLException {
String columnName = resultSet.getString("COLUMN_NAME");
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
index a45f7ce7c5..4d70b369c3 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
@@ -76,7 +76,7 @@ public class JdbcCatalogUtils {
log.info("Loading catalog tables for catalog : {}",
jdbcCatalog.getClass());
jdbcCatalog.open();
- Map<String, Map<String, String>> unsupportedTable = new
HashMap<>();
+ Map<String, Map<String, String>> unsupportedTable = new
LinkedHashMap<>();
for (JdbcSourceTableConfig tableConfig : tablesConfig) {
try {
CatalogTable catalogTable =
diff --git
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/catalog/KuduCatalog.java
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/catalog/KuduCatalog.java
index 0fa40d8b0e..b5ba121fb5 100644
---
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/catalog/KuduCatalog.java
+++
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/catalog/KuduCatalog.java
@@ -48,6 +48,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import static
org.apache.seatunnel.connectors.seatunnel.kudu.config.CommonConfig.ADMIN_OPERATION_TIMEOUT;
import static
org.apache.seatunnel.connectors.seatunnel.kudu.config.CommonConfig.ENABLE_KERBEROS;
@@ -89,6 +90,11 @@ public class KuduCatalog implements Catalog {
}
}
+ @Override
+ public String name() {
+ return catalogName;
+ }
+
@Override
public String getDefaultDatabase() throws CatalogException {
return defaultDatabase;
@@ -143,17 +149,20 @@ public class KuduCatalog implements Catalog {
kuduTable.getPartitionSchema();
List<ColumnSchema> columnSchemaList = schema.getColumns();
Optional<PrimaryKey> primaryKey =
getPrimaryKey(schema.getPrimaryKeyColumns());
- for (int i = 0; i < columnSchemaList.size(); i++) {
- SeaTunnelDataType<?> type =
KuduTypeMapper.mapping(columnSchemaList, i);
- builder.column(
- PhysicalColumn.of(
+ buildColumnsWithErrorCheck(
+ tablePath,
+ builder,
+ IntStream.range(0, columnSchemaList.size()).iterator(),
+ i -> {
+ SeaTunnelDataType<?> type =
KuduTypeMapper.mapping(columnSchemaList, i);
+ return PhysicalColumn.of(
columnSchemaList.get(i).getName(),
type,
columnSchemaList.get(i).getTypeSize(),
columnSchemaList.get(i).isNullable(),
columnSchemaList.get(i).getDefaultValue(),
- columnSchemaList.get(i).getComment()));
- }
+ columnSchemaList.get(i).getComment());
+ });
primaryKey.ifPresent(builder::primaryKey);
diff --git
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduTypeMapper.java
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduTypeMapper.java
index a088ed2107..7c8383c8ff 100644
---
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduTypeMapper.java
+++
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduTypeMapper.java
@@ -32,15 +32,13 @@ import org.apache.kudu.Type;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.sql.SQLException;
import java.util.List;
public class KuduTypeMapper {
private static final Logger log =
LoggerFactory.getLogger(KuduTypeMapper.class);
- public static SeaTunnelDataType<?> mapping(List<ColumnSchema>
columnSchemaList, int colIndex)
- throws SQLException {
+ public static SeaTunnelDataType<?> mapping(List<ColumnSchema>
columnSchemaList, int colIndex) {
Type kuduType = columnSchemaList.get(colIndex).getType();
switch (kuduType) {
case BOOL:
diff --git
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalog.java
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalog.java
index b131277bd7..6477eb2e36 100644
---
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalog.java
+++
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalog.java
@@ -66,6 +66,11 @@ public class MaxComputeCatalog implements Catalog {
@Override
public void close() throws CatalogException {}
+ @Override
+ public String name() {
+ return catalogName;
+ }
+
@Override
public String getDefaultDatabase() throws CatalogException {
return readonlyConfig.get(PROJECT);
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
index 5caf035889..0c37322199 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
@@ -59,6 +59,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.stream.IntStream;
import static
org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkArgument;
@@ -167,18 +168,25 @@ public class StarRocksCatalog implements Catalog {
ResultSetMetaData tableMetaData = ps.getMetaData();
TableSchema.Builder builder = TableSchema.builder();
- for (int i = 1; i <= tableMetaData.getColumnCount(); i++) {
- SeaTunnelDataType<?> type = fromJdbcType(tableMetaData, i);
- // TODO add default value and test it
- builder.column(
- PhysicalColumn.of(
- tableMetaData.getColumnName(i),
- type,
- tableMetaData.getColumnDisplaySize(i),
- tableMetaData.isNullable(i) ==
ResultSetMetaData.columnNullable,
- null,
- tableMetaData.getColumnLabel(i)));
- }
+ buildColumnsWithErrorCheck(
+ tablePath,
+ builder,
+ IntStream.range(1, tableMetaData.getColumnCount() +
1).iterator(),
+ i -> {
+ try {
+ SeaTunnelDataType<?> type =
fromJdbcType(tableMetaData, i);
+ // TODO add default value and test it
+ return PhysicalColumn.of(
+ tableMetaData.getColumnName(i),
+ type,
+ tableMetaData.getColumnDisplaySize(i),
+ tableMetaData.isNullable(i) ==
ResultSetMetaData.columnNullable,
+ null,
+ tableMetaData.getColumnLabel(i));
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ });
primaryKey.ifPresent(builder::primaryKey);
@@ -392,6 +400,11 @@ public class StarRocksCatalog implements Catalog {
LOG.info("Catalog {} closing", catalogName);
}
+ @Override
+ public String name() {
+ return catalogName;
+ }
+
protected Optional<PrimaryKey> getPrimaryKey(String schema, String table)
throws SQLException {
List<String> pkFields = new ArrayList<>();