This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch apache_240219_doris_type_converter
in repository https://gitbox.apache.org/repos/asf/seatunnel.git

commit b5799e44501be7cc66a77691e8744d59e0dd4831
Author: Eric <[email protected]>
AuthorDate: Fri Jan 26 11:32:57 2024 +0800

    Fix DEFAULT TABLE problem
---
 .../api/table/catalog/CatalogTableUtil.java        | 70 +++++++++++++++++++++-
 .../jdbc/internal/dialect/mysql/MysqlDialect.java  |  8 ++-
 .../internal/dialect/oracle/OracleDialect.java     |  8 ++-
 .../internal/dialect/psql/PostgresDialect.java     |  9 ++-
 .../dialect/sqlserver/SqlServerDialect.java        |  8 ++-
 5 files changed, 92 insertions(+), 11 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
index 6f2b6adeb2..1794fd0949 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
@@ -37,11 +37,15 @@ import lombok.extern.slf4j.Slf4j;
 
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 /** Utils contains some common methods for construct CatalogTable. */
 @Slf4j
@@ -115,14 +119,14 @@ public class CatalogTableUtil implements Serializable {
         return optionalCatalog
                 .map(
                         c -> {
-                            long startTime = System.currentTimeMillis();
                             try (Catalog catalog = c) {
+                                long startTime = System.currentTimeMillis();
                                 catalog.open();
                                 List<CatalogTable> catalogTables =
                                         catalog.getTables(readonlyConfig);
                                 log.info(
                                         String.format(
-                                                "Get catalog tables, cost 
time: %d",
+                                                "Get catalog tables, cost 
time: %d/ms",
                                                 System.currentTimeMillis() - 
startTime));
                                 if (catalogTables.isEmpty()) {
                                     throw new SeaTunnelException(
@@ -234,4 +238,66 @@ public class CatalogTableUtil implements Serializable {
     public static CatalogTable buildSimpleTextTable() {
         return getCatalogTable("default", buildSimpleTextSchema());
     }
+
+    public static SeaTunnelDataType 
toSeaTunnelRowType(Collection<CatalogTable> catalogTables) {
+
+        if (catalogTables.size() == 1) {
+            Iterator<CatalogTable> iterator = catalogTables.iterator();
+            return iterator.next().getSeaTunnelRowType();
+        }
+
+        Map<String, SeaTunnelRowType> rowTypeMap = new HashMap<>();
+
+        for (CatalogTable catalogTable : catalogTables) {
+            String tableId = 
catalogTable.getTableId().toTablePath().toString();
+            rowTypeMap.put(tableId, 
catalogTable.getTableSchema().toPhysicalRowDataType());
+        }
+        return new MultipleRowType(rowTypeMap);
+    }
+
+    public static CatalogTable buildEmptyCatalogTable(String catalogName) {
+        return CatalogTable.of(
+                TableIdentifier.of(catalogName, TablePath.DEFAULT),
+                TableSchema.builder().build(),
+                new HashMap<>(),
+                new ArrayList<>(),
+                null);
+    }
+
+    public static CatalogTable newCatalogTable(
+            CatalogTable catalogTable, SeaTunnelRowType seaTunnelRowType) {
+        TableSchema tableSchema = catalogTable.getTableSchema();
+
+        Map<String, Column> columnMap =
+                tableSchema.getColumns().stream()
+                        .collect(Collectors.toMap(Column::getName, 
Function.identity()));
+        String[] fieldNames = seaTunnelRowType.getFieldNames();
+        SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes();
+
+        List<Column> finalColumns = new ArrayList<>();
+        for (int i = 0; i < fieldNames.length; i++) {
+            Column column = columnMap.get(fieldNames[i]);
+            if (column != null) {
+                finalColumns.add(column);
+            } else {
+                finalColumns.add(
+                        PhysicalColumn.of(fieldNames[i], fieldTypes[i], 0, 
false, null, null));
+            }
+        }
+
+        TableSchema finalSchema =
+                TableSchema.builder()
+                        .columns(finalColumns)
+                        .primaryKey(tableSchema.getPrimaryKey())
+                        .constraintKey(tableSchema.getConstraintKeys())
+                        .build();
+
+        return CatalogTable.of(
+                catalogTable.getTableId(),
+                finalSchema,
+                catalogTable.getOptions(),
+                catalogTable.getPartitionKeys(),
+                catalogTable.getComment(),
+                catalogTable.getCatalogName());
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
index a6632a5873..5527417e91 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
@@ -174,12 +174,16 @@ public class MysqlDialect implements JdbcDialect {
         // 2. If a query is configured but does not contain a WHERE clause and 
tablePath is
         // configured , use TABLE STATUS.
         // 3. If a query is configured with a WHERE clause, or a query 
statement is configured but
-        // tablePath is not, use COUNT(*).
+        // tablePath is TablePath.DEFAULT, use COUNT(*).
 
         boolean useTableStats =
                 StringUtils.isBlank(table.getQuery())
                         || (!table.getQuery().toLowerCase().contains("where")
-                                && table.getTablePath() != null);
+                                && table.getTablePath() != null
+                                && !TablePath.DEFAULT
+                                        .getFullName()
+                                        
.equals(table.getTablePath().getFullName()));
+
         if (useTableStats) {
             // The statement used to get approximate row count which is less
             // accurate than COUNT(*), but is more efficient for large table.
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java
index 1cfeb8d705..8dedc6dfc1 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java
@@ -184,12 +184,16 @@ public class OracleDialect implements JdbcDialect {
         // 2. If a query is configured but does not contain a WHERE clause and 
tablePath is
         // configured, use TABLE STATUS.
         // 3. If a query is configured with a WHERE clause, or a query 
statement is configured but
-        // tablePath is not, use COUNT(*).
+        // tablePath is TablePath.DEFAULT, use COUNT(*).
 
         boolean useTableStats =
                 StringUtils.isBlank(table.getQuery())
                         || (!table.getQuery().toLowerCase().contains("where")
-                                && table.getTablePath() != null);
+                                && table.getTablePath() != null
+                                && !TablePath.DEFAULT
+                                        .getFullName()
+                                        
.equals(table.getTablePath().getFullName()));
+
         if (useTableStats) {
             TablePath tablePath = table.getTablePath();
             String analyzeTable =
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java
index d1bf6257ec..3a8b3cd807 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java
@@ -155,19 +155,22 @@ public class PostgresDialect implements JdbcDialect {
         // 2. If a query is configured but does not contain a WHERE clause and 
tablePath is
         // configured, use TABLE STATUS.
         // 3. If a query is configured with a WHERE clause, or a query 
statement is configured but
-        // tablePath is not, use COUNT(*).
+        // tablePath is TablePath.DEFAULT, use COUNT(*).
 
         boolean useTableStats =
                 StringUtils.isBlank(table.getQuery())
                         || (!table.getQuery().toLowerCase().contains("where")
-                                && table.getTablePath() != null);
+                                && table.getTablePath() != null
+                                && !TablePath.DEFAULT
+                                        .getFullName()
+                                        
.equals(table.getTablePath().getFullName()));
         if (useTableStats) {
             String rowCountQuery =
                     String.format(
                             "SELECT reltuples FROM pg_class r WHERE relkind = 
'r' AND relname = '%s';",
                             table.getTablePath().getTableName());
             try (Statement stmt = connection.createStatement()) {
-                log.info("Split Chunk, approximateRowCntStatement: {}", 
rowCountQuery);
+                log.error("Split Chunk, approximateRowCntStatement: {}", 
rowCountQuery);
                 try (ResultSet rs = stmt.executeQuery(rowCountQuery)) {
                     if (!rs.next()) {
                         throw new SQLException(
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java
index 8826e1fdc9..87e7418966 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java
@@ -165,12 +165,16 @@ public class SqlServerDialect implements JdbcDialect {
         // 2. If a query is configured but does not contain a WHERE clause and 
tablePath is
         // configured, use TABLE STATUS.
         // 3. If a query is configured with a WHERE clause, or a query 
statement is configured but
-        // tablePath is not, use COUNT(*).
+        // tablePath is TablePath.DEFAULT, use COUNT(*).
 
         boolean useTableStats =
                 StringUtils.isBlank(table.getQuery())
                         || (!table.getQuery().toLowerCase().contains("where")
-                                && table.getTablePath() != null);
+                                && table.getTablePath() != null
+                                && !TablePath.DEFAULT
+                                        .getFullName()
+                                        
.equals(table.getTablePath().getFullName()));
+
         if (useTableStats) {
             TablePath tablePath = table.getTablePath();
             try (Statement stmt = connection.createStatement()) {

Reply via email to