This is an automated email from the ASF dual-hosted git repository. gaojun2048 pushed a commit to branch apache_240219_doris_type_converter in repository https://gitbox.apache.org/repos/asf/seatunnel.git
commit b5799e44501be7cc66a77691e8744d59e0dd4831 Author: Eric <[email protected]> AuthorDate: Fri Jan 26 11:32:57 2024 +0800 Fix DEFAULT TABLE problem --- .../api/table/catalog/CatalogTableUtil.java | 70 +++++++++++++++++++++- .../jdbc/internal/dialect/mysql/MysqlDialect.java | 8 ++- .../internal/dialect/oracle/OracleDialect.java | 8 ++- .../internal/dialect/psql/PostgresDialect.java | 9 ++- .../dialect/sqlserver/SqlServerDialect.java | 8 ++- 5 files changed, 92 insertions(+), 11 deletions(-) diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java index 6f2b6adeb2..1794fd0949 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java @@ -37,11 +37,15 @@ import lombok.extern.slf4j.Slf4j; import java.io.Serializable; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.function.Function; +import java.util.stream.Collectors; /** Utils contains some common methods for construct CatalogTable. */ @Slf4j @@ -115,14 +119,14 @@ public class CatalogTableUtil implements Serializable { return optionalCatalog .map( c -> { - long startTime = System.currentTimeMillis(); try (Catalog catalog = c) { + long startTime = System.currentTimeMillis(); catalog.open(); List<CatalogTable> catalogTables = catalog.getTables(readonlyConfig); log.info( String.format( - "Get catalog tables, cost time: %d", + "Get catalog tables, cost time: %d/ms", System.currentTimeMillis() - startTime)); if (catalogTables.isEmpty()) { throw new SeaTunnelException( @@ -234,4 +238,66 @@ public class CatalogTableUtil implements Serializable { public static CatalogTable buildSimpleTextTable() { return getCatalogTable("default", buildSimpleTextSchema()); } + + public static SeaTunnelDataType toSeaTunnelRowType(Collection<CatalogTable> catalogTables) { + + if (catalogTables.size() == 1) { + Iterator<CatalogTable> iterator = catalogTables.iterator(); + return iterator.next().getSeaTunnelRowType(); + } + + Map<String, SeaTunnelRowType> rowTypeMap = new HashMap<>(); + + for (CatalogTable catalogTable : catalogTables) { + String tableId = catalogTable.getTableId().toTablePath().toString(); + rowTypeMap.put(tableId, catalogTable.getTableSchema().toPhysicalRowDataType()); + } + return new MultipleRowType(rowTypeMap); + } + + public static CatalogTable buildEmptyCatalogTable(String catalogName) { + return CatalogTable.of( + TableIdentifier.of(catalogName, TablePath.DEFAULT), + TableSchema.builder().build(), + new HashMap<>(), + new ArrayList<>(), + null); + } + + public static CatalogTable newCatalogTable( + CatalogTable catalogTable, SeaTunnelRowType seaTunnelRowType) { + TableSchema tableSchema = catalogTable.getTableSchema(); + + Map<String, Column> columnMap = + tableSchema.getColumns().stream() + .collect(Collectors.toMap(Column::getName, Function.identity())); + String[] fieldNames = seaTunnelRowType.getFieldNames(); + SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes(); + + List<Column> finalColumns = new ArrayList<>(); + for (int i = 0; i < fieldNames.length; i++) { + Column column = columnMap.get(fieldNames[i]); + if (column != null) { + finalColumns.add(column); + } else { + finalColumns.add( + PhysicalColumn.of(fieldNames[i], fieldTypes[i], 0, false, null, null)); + } + } + + TableSchema finalSchema = + TableSchema.builder() + .columns(finalColumns) + .primaryKey(tableSchema.getPrimaryKey()) + .constraintKey(tableSchema.getConstraintKeys()) + .build(); + + return CatalogTable.of( + catalogTable.getTableId(), + finalSchema, + catalogTable.getOptions(), + catalogTable.getPartitionKeys(), + catalogTable.getComment(), + catalogTable.getCatalogName()); + } } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java index a6632a5873..5527417e91 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java @@ -174,12 +174,16 @@ public class MysqlDialect implements JdbcDialect { // 2. If a query is configured but does not contain a WHERE clause and tablePath is // configured , use TABLE STATUS. // 3. If a query is configured with a WHERE clause, or a query statement is configured but - // tablePath is not, use COUNT(*). + // tablePath is TablePath.DEFAULT, use COUNT(*). boolean useTableStats = StringUtils.isBlank(table.getQuery()) || (!table.getQuery().toLowerCase().contains("where") - && table.getTablePath() != null); + && table.getTablePath() != null + && !TablePath.DEFAULT + .getFullName() + .equals(table.getTablePath().getFullName())); + if (useTableStats) { // The statement used to get approximate row count which is less // accurate than COUNT(*), but is more efficient for large table. diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java index 1cfeb8d705..8dedc6dfc1 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java @@ -184,12 +184,16 @@ public class OracleDialect implements JdbcDialect { // 2. If a query is configured but does not contain a WHERE clause and tablePath is // configured, use TABLE STATUS. // 3. If a query is configured with a WHERE clause, or a query statement is configured but - // tablePath is not, use COUNT(*). + // tablePath is TablePath.DEFAULT, use COUNT(*). boolean useTableStats = StringUtils.isBlank(table.getQuery()) || (!table.getQuery().toLowerCase().contains("where") - && table.getTablePath() != null); + && table.getTablePath() != null + && !TablePath.DEFAULT + .getFullName() + .equals(table.getTablePath().getFullName())); + if (useTableStats) { TablePath tablePath = table.getTablePath(); String analyzeTable = diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java index d1bf6257ec..3a8b3cd807 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java @@ -155,19 +155,22 @@ public class PostgresDialect implements JdbcDialect { // 2. If a query is configured but does not contain a WHERE clause and tablePath is // configured, use TABLE STATUS. // 3. If a query is configured with a WHERE clause, or a query statement is configured but - // tablePath is not, use COUNT(*). + // tablePath is TablePath.DEFAULT, use COUNT(*). boolean useTableStats = StringUtils.isBlank(table.getQuery()) || (!table.getQuery().toLowerCase().contains("where") - && table.getTablePath() != null); + && table.getTablePath() != null + && !TablePath.DEFAULT + .getFullName() + .equals(table.getTablePath().getFullName())); if (useTableStats) { String rowCountQuery = String.format( "SELECT reltuples FROM pg_class r WHERE relkind = 'r' AND relname = '%s';", table.getTablePath().getTableName()); try (Statement stmt = connection.createStatement()) { - log.info("Split Chunk, approximateRowCntStatement: {}", rowCountQuery); + log.error("Split Chunk, approximateRowCntStatement: {}", rowCountQuery); try (ResultSet rs = stmt.executeQuery(rowCountQuery)) { if (!rs.next()) { throw new SQLException( diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java index 8826e1fdc9..87e7418966 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java @@ -165,12 +165,16 @@ public class SqlServerDialect implements JdbcDialect { // 2. If a query is configured but does not contain a WHERE clause and tablePath is // configured, use TABLE STATUS. // 3. If a query is configured with a WHERE clause, or a query statement is configured but - // tablePath is not, use COUNT(*). + // tablePath is TablePath.DEFAULT, use COUNT(*). boolean useTableStats = StringUtils.isBlank(table.getQuery()) || (!table.getQuery().toLowerCase().contains("where") - && table.getTablePath() != null); + && table.getTablePath() != null + && !TablePath.DEFAULT + .getFullName() + .equals(table.getTablePath().getFullName())); + if (useTableStats) { TablePath tablePath = table.getTablePath(); try (Statement stmt = connection.createStatement()) {
