yzeng1618 commented on code in PR #10427:
URL: https://github.com/apache/seatunnel/pull/10427#discussion_r2752793495


##########
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/kingbase/KingbaseCreateTableSqlBuilder.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.kingbase;
+
+import org.apache.seatunnel.shade.org.apache.commons.lang3.StringUtils;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.kingbase.KingbaseTypeConverter;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+public class KingbaseCreateTableSqlBuilder {
+
+    private List<Column> columns;
+    private PrimaryKey primaryKey;
+    private String sourceCatalogName;
+    private String fieldIde;
+    private boolean createIndex;
+
+    public KingbaseCreateTableSqlBuilder(CatalogTable catalogTable, boolean 
createIndex) {
+        this.columns = catalogTable.getTableSchema().getColumns();
+        this.primaryKey = catalogTable.getTableSchema().getPrimaryKey();
+        this.sourceCatalogName = catalogTable.getCatalogName();
+        this.fieldIde = catalogTable.getOptions().get("fieldIde");
+        this.createIndex = createIndex;
+    }
+
+    public String build(TablePath tablePath) {
+        StringBuilder createTableSql = new StringBuilder();
+        createTableSql
+                .append("CREATE TABLE ")
+                .append(tablePath.getSchemaAndTableName("\""))
+                .append(" (\n");
+
+        List<String> columnSqls =
+                columns.stream()
+                        .map(column -> 
CatalogUtils.getFieldIde(buildColumnSql(column), fieldIde))
+                        .collect(Collectors.toList());
+
+        // Add primary key directly in the create table statement
+        if (createIndex
+                && primaryKey != null
+                && primaryKey.getColumnNames() != null
+                && primaryKey.getColumnNames().size() > 0) {
+            columnSqls.add(buildPrimaryKeySql(primaryKey));
+        }
+
+        createTableSql.append(String.join(",\n", columnSqls));
+        createTableSql.append("\n)");
+
+        List<String> commentSqls =
+                columns.stream()
+                        .filter(column -> 
StringUtils.isNotBlank(column.getComment()))
+                        .map(
+                                column ->
+                                        buildColumnCommentSql(
+                                                column, 
tablePath.getSchemaAndTableName("\"")))
+                        .collect(Collectors.toList());
+
+        if (!commentSqls.isEmpty()) {
+            createTableSql.append(";\n");
+            createTableSql.append(String.join(";\n", commentSqls));
+        }
+
+        return createTableSql.toString();
+    }
+
+    private String buildColumnSql(Column column) {
+        StringBuilder columnSql = new StringBuilder();
+        columnSql.append("\"").append(column.getName()).append("\" ");
+
+        String columnType =
+                StringUtils.equalsIgnoreCase(DatabaseIdentifier.KINGBASE, 
sourceCatalogName)
+                        ? column.getSourceType()
+                        : 
KingbaseTypeConverter.INSTANCE.reconvert(column).getColumnType();
+        columnSql.append(columnType);
+
+        if (!column.isNullable()) {
+            columnSql.append(" NOT NULL");
+        }
+
+        return columnSql.toString();
+    }
+
+    private String buildPrimaryKeySql(PrimaryKey primaryKey) {
+        String randomSuffix = UUID.randomUUID().toString().replace("-", 
"").substring(0, 4);
+        String columnNamesString =
+                primaryKey.getColumnNames().stream()
+                        .map(columnName -> "\"" + columnName + "\"")
+                        .collect(Collectors.joining(", "));
+
+        String primaryKeyStr = primaryKey.getPrimaryKey();
+        if (primaryKeyStr.length() > 25) {
+            primaryKeyStr = primaryKeyStr.substring(0, 25);
+        }
+
+        return CatalogUtils.getFieldIde(
+                "CONSTRAINT "
+                        + primaryKeyStr
+                        + "_"
+                        + randomSuffix
+                        + " PRIMARY KEY ("
+                        + columnNamesString
+                        + ")",
+                fieldIde);
+    }
+
+    private String buildColumnCommentSql(Column column, String tableName) {
+        StringBuilder columnCommentSql = new StringBuilder();
+        columnCommentSql
+                .append(CatalogUtils.quoteIdentifier("COMMENT ON COLUMN ", 
fieldIde))
+                .append(tableName)
+                .append(".");
+        columnCommentSql
+                .append(CatalogUtils.quoteIdentifier(column.getName(), 
fieldIde, "\""))
+                .append(CatalogUtils.quoteIdentifier(" IS '", fieldIde))
+                .append(column.getComment())
+                .append("'");

Review Comment:
   Unescaped single quotes in comment concatenation: it is recommended to use 
replace("'", "''").



##########
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/kingbase/KingbaseCatalog.java:
##########
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.kingbase;
+
+import org.apache.seatunnel.shade.org.apache.commons.lang3.StringUtils;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
+import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+import org.apache.seatunnel.common.utils.JdbcUrlUtil;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.kingbase.KingbaseTypeConverter;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.kingbase.KingbaseTypeMapper;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static 
org.apache.seatunnel.common.exception.CommonErrorCode.UNSUPPORTED_METHOD;
+
+@Slf4j
+public class KingbaseCatalog extends AbstractJdbcCatalog {
+
+    protected static List<String> EXCLUDED_SCHEMAS =
+            Collections.unmodifiableList(
+                    Arrays.asList(
+                            "INFORMATION_SCHEMA",
+                            "SYSAUDIT",
+                            "SYSLOGICAL",
+                            "SYS_CATALOG",
+                            "SYS_HM",
+                            "XLOG_RECORD_READ"));
+
+    private static final String SELECT_COLUMNS_SQL_TEMPLATE =
+            " SELECT \n"
+                    + "    a.attname AS column_name,\n"
+                    + "    CASE \n"
+                    + "        WHEN lower(format_type(a.atttypid, NULL)) IN 
('varchar', 'character varying') THEN 'VARCHAR'\n"
+                    + "        WHEN lower(format_type(a.atttypid, NULL)) IN 
('char', 'character') THEN 'CHAR'\n"
+                    + "        WHEN lower(format_type(a.atttypid, NULL)) IN 
('boolean', 'bool') THEN 'BOOL'\n"
+                    + "        WHEN lower(format_type(a.atttypid, NULL)) = 
'real' THEN 'FLOAT4'\n"
+                    + "        WHEN lower(format_type(a.atttypid, NULL)) = 
'double precision' THEN 'FLOAT8'\n"
+                    + "        WHEN lower(format_type(a.atttypid, NULL)) = 
'integer' THEN 'INT4'\n"
+                    + "        WHEN lower(format_type(a.atttypid, NULL)) = 
'bigint' THEN 'INT8'\n"
+                    + "        WHEN lower(format_type(a.atttypid, NULL)) = 
'smallint' THEN 'INT2'\n"
+                    + "        WHEN lower(format_type(a.atttypid, NULL)) = 
'time without time zone' THEN 'TIME'\n"
+                    + "        WHEN lower(format_type(a.atttypid, NULL)) = 
'timestamp without time zone' THEN 'TIMESTAMP'\n"
+                    + "        WHEN lower(format_type(a.atttypid, NULL)) = 
'timestamp with time zone' THEN 'TIMESTAMPTZ'\n"
+                    + "        ELSE format_type(a.atttypid, NULL)\n"
+                    + "    END AS type_name,\n"
+                    + "    format_type(a.atttypid, a.atttypmod) AS 
full_type_name,\n"
+                    + "    CASE \n"
+                    + "        WHEN a.atttypid IN (SELECT oid FROM sys_type 
WHERE typname IN ( 'CHAR','CHARACTER','VARCHAR','CHARACTER VARYING','BPCHAR') 
)\n"
+                    + "        THEN ABS(a.atttypmod)     \n"
+                    + "        WHEN a.atttypid IN (SELECT oid FROM sys_type 
WHERE typname IN ('NUMERIC', 'DECIMAL'))\n"
+                    + "        THEN (a.atttypmod - 4) >> 16\n"
+                    + "        WHEN a.atttypid IN (SELECT oid FROM sys_type 
WHERE typname IN ('INT', 'INTEGER', 'SMALLINT', 'BIGINT'))\n"
+                    + "        THEN NULL\n"
+                    + "        WHEN a.atttypid IN (SELECT oid FROM sys_type 
WHERE typname IN ('TIME','TIMESTAMPTZ', 'TIMESTAMP'))\n"
+                    + "        THEN NULL\n"
+                    + "        ELSE NULL\n"
+                    + "    END AS column_length,\n"
+                    + "    CASE \n"
+                    + "        WHEN a.atttypid IN (SELECT oid FROM sys_type 
WHERE typname IN ('NUMERIC', 'DECIMAL'))\n"
+                    + "        THEN (a.atttypmod - 4) >> 16\n"
+                    + "        ELSE NULL\n"
+                    + "    END AS column_precision,\n"
+                    + "    CASE \n"
+                    + "        WHEN a.atttypid IN (SELECT oid FROM sys_type 
WHERE typname IN ('NUMERIC', 'DECIMAL'))\n"
+                    + "        THEN (a.atttypmod - 4) & 65535\n"
+                    + "        ELSE NULL\n"
+                    + "    END AS column_scale,\n"
+                    + "    d.description AS column_comment,\n"
+                    + "    pg_get_expr(ad.adbin, ad.adrelid) AS 
default_value,\n"
+                    + "    CASE \n"
+                    + "        WHEN a.attnotnull = false THEN 'YES'\n"
+                    + "        ELSE 'NO'\n"
+                    + "    END AS is_nullable\n"
+                    + "FROM \n"
+                    + "    sys_class c\n"
+                    + "    JOIN sys_namespace n ON c.relnamespace = n.oid\n"
+                    + "    JOIN sys_attribute a ON a.attrelid = c.oid\n"
+                    + "    LEFT JOIN sys_description d ON d.objoid = 
a.attrelid AND d.objsubid = a.attnum\n"
+                    + "    LEFT JOIN sys_attrdef ad ON ad.adrelid = a.attrelid 
AND ad.adnum = a.attnum\n"
+                    + "WHERE \n"
+                    + "    n.nspname = '%s' \n"
+                    + "    AND c.relname = '%s' \n"
+                    + "    AND a.attnum > 0 \n"
+                    + "    AND NOT a.attisdropped;";
+
+    public KingbaseCatalog(
+            String catalogName,
+            String username,
+            String pwd,
+            JdbcUrlUtil.UrlInfo urlInfo,
+            String defaultSchema,
+            String driverClass) {
+        super(catalogName, username, pwd, urlInfo, defaultSchema, driverClass);
+    }
+
+    @Override
+    protected String getListDatabaseSql() {
+        return "SELECT current_database();";
+    }
+
+    /**
+     * Override the databaseExists method because SELECT current_database() 
does not support WHERE
+     */
+    @Override
+    public boolean databaseExists(String databaseName) throws CatalogException 
{
+        if (StringUtils.isBlank(databaseName)) {
+            return false;
+        }
+        try {
+            return querySQLResultExists(getUrlFromDatabaseName(databaseName), 
getListDatabaseSql());
+        } catch (SeaTunnelRuntimeException e) {
+            if 
(e.getSeaTunnelErrorCode().getCode().equals(UNSUPPORTED_METHOD.getCode())) {
+                log.warn(
+                        "The catalog: {} is not supported the 
getListDatabaseSql for databaseExists",
+                        this.catalogName);
+                return listDatabases().contains(databaseName);
+            }
+            throw e;
+        } catch (SQLException e) {
+            throw new CatalogException("查询数据库是否存在失败: " + databaseName, e);
+        }
+    }
+
+    @Override
+    protected String getCreateTableSql(
+            TablePath tablePath, CatalogTable table, boolean createIndex) {
+        return new KingbaseCreateTableSqlBuilder(table, 
createIndex).build(tablePath);
+    }
+
+    @Override
+    protected String getDropTableSql(TablePath tablePath) {
+        return String.format("DROP TABLE %s", 
tablePath.getSchemaAndTableName("\""));
+    }
+
+    @Override
+    protected String getListTableSql(String databaseName) {
+        return "SELECT SCHEMANAME ,TABLENAME FROM SYS_TABLES";
+    }
+
+    @Override
+    protected String getTableWithConditionSql(TablePath tablePath) {
+        return String.format(
+                getListTableSql(tablePath.getDatabaseName())
+                        + "  where SCHEMANAME = '%s' and TABLENAME = '%s';",
+                tablePath.getSchemaName(),
+                tablePath.getTableName());
+    }
+
+    @Override
+    protected String getTableName(ResultSet rs) throws SQLException {
+        if (EXCLUDED_SCHEMAS.contains(rs.getString(1))) {
+            return null;
+        }
+        return rs.getString(1) + "." + rs.getString(2);
+    }
+
+    @Override
+    protected String getSelectColumnsSql(TablePath tablePath) {
+        return String.format(
+                SELECT_COLUMNS_SQL_TEMPLATE, tablePath.getSchemaName(), 
tablePath.getTableName());
+    }
+
+    @Override
+    protected Column buildColumn(ResultSet resultSet) throws SQLException {
+        String columnName = resultSet.getString("COLUMN_NAME");
+        String typeName = resultSet.getString("TYPE_NAME");
+        String fullTypeName = resultSet.getString("FULL_TYPE_NAME");
+        long columnLength = resultSet.getLong("COLUMN_LENGTH");
+        long columnPrecision = resultSet.getLong("COLUMN_PRECISION");
+        int columnScale = resultSet.getInt("COLUMN_SCALE");
+        String columnComment = resultSet.getString("COLUMN_COMMENT");
+        Object defaultValue = resultSet.getObject("DEFAULT_VALUE");
+        boolean isNullable = resultSet.getString("IS_NULLABLE").equals("YES");
+
+        BasicTypeDefine typeDefine =
+                BasicTypeDefine.builder()
+                        .name(columnName)
+                        .columnType(typeName)

Review Comment:
   FULL_TYPE_NAME is read but not used; columnType/sourceType loses information 
such as VARCHAR(255), CHAR(10), and NUMERIC(38,18).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to