This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch cdc-multiple-table
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/cdc-multiple-table by this 
push:
     new d8a0be84c [Feature] [Catalog] Support create/drop table, create/drop 
database in catalog (#4075)
d8a0be84c is described below

commit d8a0be84ca4cfc38ccedad33bc77a05d1803b4a4
Author: Wenjun Ruan <[email protected]>
AuthorDate: Fri Feb 10 14:25:17 2023 +0800

    [Feature] [Catalog] Support create/drop table, create/drop database in 
catalog (#4075)
    
    * Add create/drop table in catalog
---
 .../api/common/SeaTunnelAPIErrorCode.java          |   5 +-
 .../seatunnel/api/table/catalog/Catalog.java       |  33 ++++
 .../apache/seatunnel/api/table/catalog/Column.java | 195 +++----------------
 .../seatunnel/api/table/catalog/ConstraintKey.java |  76 ++++++++
 .../api/table/catalog/MetadataColumn.java          |  65 +++++++
 .../api/table/catalog/PhysicalColumn.java          |  60 ++++++
 .../PrimaryKey.java}                               |  29 +--
 .../seatunnel/api/table/catalog/TableSchema.java   |  79 ++------
 .../exception/DatabaseAlreadyExistException.java   |  34 ++++
 .../exception/TableAlreadyExistException.java}     |  26 ++-
 .../api/table/type/SeaTunnelDataType.java          |   1 +
 .../jdbc/catalog/AbstractJdbcCatalog.java          | 126 +++++++++++--
 .../seatunnel/jdbc/catalog/MySqlCatalog.java       | 168 ++++++++---------
 .../catalog/sql/MysqlCreateTableSqlBuilder.java    | 209 +++++++++++++++++++++
 .../seatunnel/jdbc/utils/DataTypeUtils.java        | 138 ++++++++++++++
 .../sql/MysqlCreateTableSqlBuilderTest.java        |  79 ++++++++
 16 files changed, 965 insertions(+), 358 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/SeaTunnelAPIErrorCode.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/SeaTunnelAPIErrorCode.java
index e89875ef9..6adf373ed 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/SeaTunnelAPIErrorCode.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/SeaTunnelAPIErrorCode.java
@@ -25,7 +25,10 @@ public enum SeaTunnelAPIErrorCode implements 
SeaTunnelErrorCode {
     CATALOG_INITIALIZE_FAILED("API-03", "Catalog initialize failed"),
     DATABASE_NOT_EXISTED("API-04", "Database not existed"),
     TABLE_NOT_EXISTED("API-05", "Table not existed"),
-    FACTORY_INITIALIZE_FAILED("API-06", "Factory initialize failed"),;
+    FACTORY_INITIALIZE_FAILED("API-06", "Factory initialize failed"),
+    DATABASE_ALREADY_EXISTED("API-07", "Database already existed"),
+    TABLE_ALREADY_EXISTED("API-08", "Table already existed"),
+    ;
 
     private final String code;
     private final String description;
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java
index e3f83d9ff..90a443d03 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java
@@ -18,7 +18,9 @@
 package org.apache.seatunnel.api.table.catalog;
 
 import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
+import 
org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException;
 import 
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
+import 
org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
 import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
 import org.apache.seatunnel.api.table.factory.Factory;
 
@@ -113,4 +115,35 @@ public interface Catalog {
      * @throws CatalogException in case of any runtime exception
      */
     CatalogTable getTable(TablePath tablePath) throws CatalogException, 
TableNotExistException;
+
+    /**
+     * Create a new table in this catalog.
+     *
+     * @param tablePath      Path of the table
+     * @param table          The table definition
+     * @param ignoreIfExists Flag to specify behavior when a table with the 
given name already exist
+     * @throws TableAlreadyExistException thrown if the table already exists 
in the catalog and ignoreIfExists is false
+     * @throws DatabaseNotExistException  thrown if the database in tablePath 
doesn't exist in the catalog
+     * @throws CatalogException           in case of any runtime exception
+     */
+    void createTable(TablePath tablePath, CatalogTable table, boolean 
ignoreIfExists)
+        throws TableAlreadyExistException, DatabaseNotExistException, 
CatalogException;
+
+    /**
+     * Drop an existing table in this catalog.
+     *
+     * @param tablePath         Path of the table
+     * @param ignoreIfNotExists Flag to specify behavior when a table with the 
given name doesn't exist
+     * @throws TableNotExistException thrown if the table doesn't exist in the 
catalog and ignoreIfNotExists is false
+     * @throws CatalogException       in case of any runtime exception
+     */
+    void dropTable(TablePath tablePath, boolean ignoreIfNotExists)
+        throws TableNotExistException, CatalogException;
+
+    void createDatabase(TablePath tablePath, boolean ignoreIfExists) throws 
DatabaseAlreadyExistException, CatalogException;
+
+    void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists) throws 
DatabaseNotExistException, CatalogException;
+
+    // todo: Support for update table metadata
+
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java
index aea7fade2..6e11e8a6e 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java
@@ -20,10 +20,17 @@ package org.apache.seatunnel.api.table.catalog;
 
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 
+import lombok.Data;
+
 import java.io.Serializable;
-import java.util.Objects;
-import java.util.Optional;
 
+/**
+ * Represent the column of {@link TableSchema}.
+ *
+ * @see PhysicalColumn
+ * @see MetadataColumn
+ */
+@Data
 @SuppressWarnings("PMD.AbstractClassShouldStartWithAbstractNamingRule")
 public abstract class Column implements Serializable {
 
@@ -37,38 +44,37 @@ public abstract class Column implements Serializable {
     /**
      * Data type of the column.
      */
+    // todo: use generic type
     protected final SeaTunnelDataType<?> dataType;
 
-    protected final String comment;
-
-    private Column(String name, SeaTunnelDataType<?> dataType, String comment) 
{
-        this.name = name;
-        this.dataType = dataType;
-        this.comment = comment;
-    }
+    protected final Integer columnLength;
 
     /**
-     * Creates a regular table column that represents physical data.
+     * Does the column can be null
      */
-    public static PhysicalColumn physical(String name, SeaTunnelDataType<?> 
dataType) {
-        return new PhysicalColumn(name, dataType);
-    }
+    protected final boolean nullable;
 
+    // todo: use generic type
     /**
-     * Creates a metadata column from metadata of the given column name or 
from metadata of the
-     * given key (if not null).
-     *
-     * <p>Allows to specify whether the column is virtual or not.
+     * The default value of the column.
      */
-    public static MetadataColumn metadata(
-        String name, SeaTunnelDataType<?> dataType, String metadataKey) {
-        return new MetadataColumn(name, dataType, metadataKey);
-    }
+    protected final Object defaultValue;
 
-    /**
-     * Add the comment to the column and return the new object.
-     */
-    public abstract Column withComment(String comment);
+    protected final String comment;
+
+    protected Column(String name,
+                     SeaTunnelDataType<?> dataType,
+                     Integer columnLength,
+                     boolean nullable,
+                     Object defaultValue,
+                     String comment) {
+        this.name = name;
+        this.dataType = dataType;
+        this.columnLength = columnLength;
+        this.nullable = nullable;
+        this.defaultValue = defaultValue;
+        this.comment = comment;
+    }
 
     /**
      * Returns whether the given column is a physical column of a table; 
neither computed nor
@@ -76,148 +82,9 @@ public abstract class Column implements Serializable {
      */
     public abstract boolean isPhysical();
 
-    /**
-     * Returns the data type of this column.
-     */
-    public SeaTunnelDataType<?> getDataType() {
-        return this.dataType;
-    }
-
-    /**
-     * Returns the name of this column.
-     */
-    public String getName() {
-        return name;
-    }
-
-    /**
-     * Returns the comment of this column.
-     */
-    public Optional<String> getComment() {
-        return Optional.ofNullable(comment);
-    }
-
     /**
      * Returns a copy of the column with a replaced {@link SeaTunnelDataType}.
      */
     public abstract Column copy(SeaTunnelDataType<?> newType);
 
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-        Column that = (Column) o;
-        return Objects.equals(this.name, that.name)
-                && Objects.equals(this.dataType, that.dataType)
-                && Objects.equals(this.comment, that.comment);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(this.name, this.dataType);
-    }
-
-    // 
--------------------------------------------------------------------------------------------
-    // Specific kinds of columns
-    // 
--------------------------------------------------------------------------------------------
-
-    /**
-     * Representation of a physical column.
-     */
-    public static final class PhysicalColumn extends Column {
-
-        private PhysicalColumn(String name, SeaTunnelDataType<?> dataType) {
-            this(name, dataType, null);
-        }
-
-        private PhysicalColumn(String name, SeaTunnelDataType<?> dataType, 
String comment) {
-            super(name, dataType, comment);
-        }
-
-        @Override
-        public PhysicalColumn withComment(String comment) {
-            if (comment == null) {
-                return this;
-            }
-            return new PhysicalColumn(name, dataType, comment);
-        }
-
-        @Override
-        public boolean isPhysical() {
-            return true;
-        }
-
-        @Override
-        public Column copy(SeaTunnelDataType<?> newDataType) {
-            return new PhysicalColumn(name, newDataType, comment);
-        }
-    }
-
-    /**
-     * Representation of a metadata column.
-     */
-    public static final class MetadataColumn extends Column {
-
-        private final String metadataKey;
-
-        private MetadataColumn(
-            String name, SeaTunnelDataType<?> dataType, String metadataKey) {
-            this(name, dataType, metadataKey, null);
-        }
-
-        private MetadataColumn(
-                String name,
-                SeaTunnelDataType<?> dataType,
-                String metadataKey,
-                String comment) {
-            super(name, dataType, comment);
-            this.metadataKey = metadataKey;
-        }
-
-        public Optional<String> getMetadataKey() {
-            return Optional.ofNullable(metadataKey);
-        }
-
-        @Override
-        public MetadataColumn withComment(String comment) {
-            if (comment == null) {
-                return this;
-            }
-            return new MetadataColumn(name, dataType, metadataKey, comment);
-        }
-
-        @Override
-        public boolean isPhysical() {
-            return false;
-        }
-
-        @Override
-        public Column copy(SeaTunnelDataType<?> newDataType) {
-            return new MetadataColumn(name, newDataType, metadataKey, comment);
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (this == o) {
-                return true;
-            }
-            if (o == null || getClass() != o.getClass()) {
-                return false;
-            }
-            if (!super.equals(o)) {
-                return false;
-            }
-            MetadataColumn that = (MetadataColumn) o;
-            return Objects.equals(metadataKey, that.metadataKey);
-        }
-
-        @Override
-        public int hashCode() {
-            return Objects.hash(super.hashCode(), metadataKey);
-        }
-    }
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/ConstraintKey.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/ConstraintKey.java
new file mode 100644
index 000000000..a294c4667
--- /dev/null
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/ConstraintKey.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.catalog;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.List;
+
+@Data
+public class ConstraintKey implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final ConstraintType constraintType;
+
+    private final String constraintName;
+
+    private final List<ConstraintKeyColumn> columnNames;
+
+    private ConstraintKey(ConstraintType constraintType,
+                          String constraintName,
+                          List<ConstraintKeyColumn> columnNames) {
+        checkNotNull(constraintType, "constraintType must not be null");
+
+        this.constraintType = constraintType;
+        this.constraintName = constraintName;
+        this.columnNames = columnNames;
+    }
+
+    public static ConstraintKey of(ConstraintType constraintType,
+                                   String constraintName,
+                                   List<ConstraintKeyColumn> columnNames) {
+        return new ConstraintKey(constraintType, constraintName, columnNames);
+    }
+
+    @Data
+    @AllArgsConstructor
+    public static class ConstraintKeyColumn {
+        private final String columnName;
+        private final ColumnSortType sortType;
+
+        public static ConstraintKeyColumn of(String columnName, ColumnSortType 
sortType) {
+            return new ConstraintKeyColumn(columnName, sortType);
+        }
+    }
+
+    public enum ConstraintType {
+        KEY,
+        UNIQUE_KEY,
+        FOREIGN_KEY
+    }
+
+    public enum ColumnSortType {
+        ASC,
+        DESC
+    }
+
+}
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/MetadataColumn.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/MetadataColumn.java
new file mode 100644
index 000000000..a88641861
--- /dev/null
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/MetadataColumn.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.catalog;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+/**
+ * Representation of a metadata column.
+ */
+@EqualsAndHashCode(callSuper = true)
+@ToString(callSuper = true)
+public class MetadataColumn extends Column {
+
+    private final String metadataKey;
+
+    protected MetadataColumn(String name,
+                             SeaTunnelDataType<?> dataType,
+                             Integer columnLength,
+                             String metadataKey,
+                             boolean nullable,
+                             Object defaultValue,
+                             String comment) {
+        super(name, dataType, columnLength, nullable, defaultValue, comment);
+        this.metadataKey = metadataKey;
+    }
+
+    public static MetadataColumn of(String name,
+                                    SeaTunnelDataType<?> dataType,
+                                    Integer columnLength,
+                                    String metadataKey,
+                                    boolean nullable,
+                                    Object defaultValue,
+                                    String comment) {
+        return new MetadataColumn(name, dataType, columnLength, metadataKey, 
nullable, defaultValue, comment);
+    }
+
+    @Override
+    public boolean isPhysical() {
+        return false;
+    }
+
+    @Override
+    public Column copy(SeaTunnelDataType<?> newType) {
+        return MetadataColumn.of(name, newType, columnLength, metadataKey, 
nullable, defaultValue, comment);
+    }
+}
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/PhysicalColumn.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/PhysicalColumn.java
new file mode 100644
index 000000000..bc22772ac
--- /dev/null
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/PhysicalColumn.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.catalog;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+/**
+ * Representation of a physical column.
+ */
+@EqualsAndHashCode(callSuper = true)
+@ToString(callSuper = true)
+public class PhysicalColumn extends Column {
+
+    protected PhysicalColumn(String name,
+                             SeaTunnelDataType<?> dataType,
+                             Integer columnLength,
+                             boolean nullable,
+                             Object defaultValue,
+                             String comment) {
+        super(name, dataType, columnLength, nullable, defaultValue, comment);
+    }
+
+    public static PhysicalColumn of(String name,
+                                    SeaTunnelDataType<?> dataType,
+                                    Integer columnLength,
+                                    boolean nullable,
+                                    Object defaultValue,
+                                    String comment) {
+        return new PhysicalColumn(name, dataType, columnLength, nullable, 
defaultValue, comment);
+    }
+
+    @Override
+    public boolean isPhysical() {
+        return true;
+    }
+
+    @Override
+    public Column copy(SeaTunnelDataType<?> newType) {
+        return PhysicalColumn.of(name, newType, columnLength, nullable, 
defaultValue, comment);
+    }
+}
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelDataType.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/PrimaryKey.java
similarity index 62%
copy from 
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelDataType.java
copy to 
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/PrimaryKey.java
index 946f4a728..d5e32c828 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelDataType.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/PrimaryKey.java
@@ -15,22 +15,25 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.api.table.type;
+package org.apache.seatunnel.api.table.catalog;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
 
 import java.io.Serializable;
+import java.util.List;
 
-/**
- * Logic data type of column in SeaTunnel.
- */
-public interface SeaTunnelDataType<T> extends Serializable {
+@Data
+@AllArgsConstructor
+public class PrimaryKey implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    // This field is not used now
+    private final String primaryKey;
 
-    /**
-     * Gets the class of the type represented by this data type.
-     */
-    Class<T> getTypeClass();
+    private final List<String> columnNames;
 
-    /**
-     * Gets the SQL standard type represented by this data type.
-     */
-    SqlType getSqlType();
+    public static PrimaryKey of(String primaryKey, List<String> columnNames) {
+        return new PrimaryKey(primaryKey, columnNames);
+    }
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java
index 0d076943e..74adafc6c 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java
@@ -20,36 +20,30 @@ package org.apache.seatunnel.api.table.catalog;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Objects;
 
 /**
  * Represent a physical table schema.
  */
+@Data
+@AllArgsConstructor
 public final class TableSchema implements Serializable {
     private static final long serialVersionUID = 1L;
     private final List<Column> columns;
 
     private final PrimaryKey primaryKey;
 
-    private TableSchema(List<Column> columns, PrimaryKey primaryKey) {
-        this.columns = columns;
-        this.primaryKey = primaryKey;
-    }
+    private final List<ConstraintKey> constraintKeys;
 
-    public static TableSchema.Builder builder() {
+    public static Builder builder() {
         return new Builder();
     }
 
-    /**
-     * Returns all {@link Column}s of this schema.
-     */
-    public List<Column> getColumns() {
-        return columns;
-    }
-
     public SeaTunnelRowType toPhysicalRowDataType() {
         SeaTunnelDataType<?>[] fieldTypes = columns.stream()
             .filter(Column::isPhysical)
@@ -67,6 +61,8 @@ public final class TableSchema implements Serializable {
 
         private PrimaryKey primaryKey;
 
+        private final List<ConstraintKey> constraintKeys = new ArrayList<>();
+
         public Builder columns(List<Column> columns) {
             this.columns.addAll(columns);
             return this;
@@ -77,69 +73,18 @@ public final class TableSchema implements Serializable {
             return this;
         }
 
-        public Builder physicalColumn(String name, SeaTunnelDataType<?> 
dataType) {
-            this.columns.add(Column.physical(name, dataType));
-            return this;
-        }
-
         public Builder primaryKey(PrimaryKey primaryKey) {
             this.primaryKey = primaryKey;
             return this;
         }
 
-        public Builder primaryKey(String constraintName, List<String> 
columnNames) {
-            this.primaryKey = PrimaryKey.of(constraintName, columnNames);
+        public Builder constraintKey(ConstraintKey constraintKey) {
+            this.constraintKeys.add(constraintKey);
             return this;
         }
 
         public TableSchema build() {
-            return new TableSchema(columns, primaryKey);
-        }
-    }
-
-    public static final class PrimaryKey implements Serializable {
-        private static final long serialVersionUID = 1L;
-
-        private final String constraintName;
-        private final List<String> columnNames;
-
-        private PrimaryKey(String constraintName, List<String> columnNames) {
-            this.constraintName = constraintName;
-            this.columnNames = columnNames;
-        }
-
-        public static PrimaryKey of(String constraintName, List<String> 
columnNames) {
-            return new PrimaryKey(constraintName, columnNames);
-        }
-
-        public String getConstraintName() {
-            return constraintName;
-        }
-
-        public List<String> getColumnNames() {
-            return columnNames;
-        }
-
-        @Override
-        public String toString() {
-            return String.format("CONSTRAINT %s PRIMARY KEY (%s) NOT 
ENFORCED", constraintName, String.join(", ", columnNames));
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (this == o) {
-                return true;
-            }
-            if (o == null || getClass() != o.getClass()) {
-                return false;
-            }
-            PrimaryKey that = (PrimaryKey) o;
-            return constraintName.equals(that.constraintName) && 
columnNames.equals(that.columnNames);
-        }
-
-        @Override
-        public int hashCode() {
-            return Objects.hash(constraintName, columnNames);
+            return new TableSchema(columns, primaryKey, constraintKeys);
         }
     }
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/exception/DatabaseAlreadyExistException.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/exception/DatabaseAlreadyExistException.java
new file mode 100644
index 000000000..40e4acb98
--- /dev/null
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/exception/DatabaseAlreadyExistException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.catalog.exception;
+
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+
+public class DatabaseAlreadyExistException extends SeaTunnelRuntimeException {
+    private static final String MSG = "Database %s already exist in Catalog 
%s.";
+
+    public DatabaseAlreadyExistException(String catalogName, String 
databaseName) {
+        this(catalogName, databaseName, null);
+    }
+
+    public DatabaseAlreadyExistException(String catalogName, String 
databaseName, Throwable cause) {
+        super(SeaTunnelAPIErrorCode.DATABASE_ALREADY_EXISTED, 
String.format(MSG, databaseName, catalogName), cause);
+    }
+}
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelDataType.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/exception/TableAlreadyExistException.java
similarity index 51%
copy from 
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelDataType.java
copy to 
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/exception/TableAlreadyExistException.java
index 946f4a728..8fe7c368f 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelDataType.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/exception/TableAlreadyExistException.java
@@ -15,22 +15,20 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.api.table.type;
+package org.apache.seatunnel.api.table.catalog.exception;
 
-import java.io.Serializable;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
 
-/**
- * Logic data type of column in SeaTunnel.
- */
-public interface SeaTunnelDataType<T> extends Serializable {
+public class TableAlreadyExistException extends SeaTunnelRuntimeException {
+    private static final String MSG = "Table %s already exist in Catalog %s.";
 
-    /**
-     * Gets the class of the type represented by this data type.
-     */
-    Class<T> getTypeClass();
+    public TableAlreadyExistException(String catalogName, TablePath tablePath) 
{
+        this(catalogName, tablePath, null);
+    }
 
-    /**
-     * Gets the SQL standard type represented by this data type.
-     */
-    SqlType getSqlType();
+    public TableAlreadyExistException(String catalogName, TablePath tablePath, 
Throwable cause) {
+        super(SeaTunnelAPIErrorCode.TABLE_ALREADY_EXISTED, String.format(MSG, 
tablePath.getFullName(), catalogName), cause);
+    }
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelDataType.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelDataType.java
index 946f4a728..3f6c2c4b8 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelDataType.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelDataType.java
@@ -33,4 +33,5 @@ public interface SeaTunnelDataType<T> extends Serializable {
      * Gets the SQL standard type represented by this data type.
      */
     SqlType getSqlType();
+
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
index 852fbbe0a..79b5397a4 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
@@ -19,14 +19,22 @@
 package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
 
 import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
 import org.apache.seatunnel.api.table.catalog.TablePath;
-import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
+import 
org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException;
 import 
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
+import 
org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
+import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
 
+import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,11 +43,13 @@ import java.sql.DatabaseMetaData;
 import java.sql.DriverManager;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.stream.Collectors;
 
 public abstract class AbstractJdbcCatalog implements Catalog {
     private static final Logger LOG = 
LoggerFactory.getLogger(AbstractJdbcCatalog.class);
@@ -163,15 +173,15 @@ public abstract class AbstractJdbcCatalog implements 
Catalog {
         LOG.info("Catalog {} closing", catalogName);
     }
 
-    protected Optional<TableSchema.PrimaryKey> getPrimaryKey(
-        DatabaseMetaData metaData, String schema, String table) throws 
SQLException {
+    protected Optional<PrimaryKey> getPrimaryKey(DatabaseMetaData metaData, 
String database, String table) throws SQLException {
 
         // According to the Javadoc of 
java.sql.DatabaseMetaData#getPrimaryKeys,
         // the returned primary key columns are ordered by COLUMN_NAME, not by 
KEY_SEQ.
         // We need to sort them based on the KEY_SEQ value.
-        ResultSet rs = metaData.getPrimaryKeys(null, schema, table);
+        ResultSet rs = metaData.getPrimaryKeys(database, table, table);
 
-        Map<Integer, String> keySeqColumnName = new HashMap<>();
+        // seq -> column name
+        List<Pair<Integer, String>> primaryKeyColumns = new ArrayList<>();
         String pkName = null;
         while (rs.next()) {
             String columnName = rs.getString("COLUMN_NAME");
@@ -179,15 +189,52 @@ public abstract class AbstractJdbcCatalog implements 
Catalog {
             pkName = rs.getString("PK_NAME");
             int keySeq = rs.getInt("KEY_SEQ");
             // KEY_SEQ is 1-based index
-            keySeqColumnName.put(keySeq - 1, columnName);
+            primaryKeyColumns.add(Pair.of(keySeq, columnName));
         }
         // initialize size
-        List<String> pkFields = Arrays.asList(new 
String[keySeqColumnName.size()]);
-        keySeqColumnName.forEach(pkFields::set);
-        if (!pkFields.isEmpty()) {
-            // PK_NAME maybe null according to the javadoc, generate an unique 
name in that case
-            pkName = pkName == null ? "pk_" + String.join("_", pkFields) : 
pkName;
-            return Optional.of(TableSchema.PrimaryKey.of(pkName, pkFields));
+        List<String> pkFields = primaryKeyColumns
+            .stream()
+            .sorted(Comparator.comparingInt(Pair::getKey))
+            .map(Pair::getValue)
+            .collect(Collectors.toList());
+        if (CollectionUtils.isEmpty(pkFields)) {
+            return Optional.empty();
+        }
+        return Optional.of(PrimaryKey.of(pkName, pkFields));
+    }
+
+    protected List<ConstraintKey> getConstraintKeys(DatabaseMetaData metaData, 
String database, String table) throws SQLException {
+        ResultSet resultSet = metaData.getIndexInfo(database, table, table, 
false, false);
+        // index name -> index
+        Map<String, ConstraintKey> constraintKeyMap = new HashMap<>();
+        while (resultSet.next()) {
+            String indexName = resultSet.getString("INDEX_NAME");
+            String columnName = resultSet.getString("COLUMN_NAME");
+            String unique = resultSet.getString("NON_UNIQUE");
+
+            ConstraintKey constraintKey = 
constraintKeyMap.computeIfAbsent(indexName, s -> {
+                ConstraintKey.ConstraintType constraintType = 
ConstraintKey.ConstraintType.KEY;
+                // 0 is unique.
+                if ("0".equals(unique)) {
+                    constraintType = ConstraintKey.ConstraintType.UNIQUE_KEY;
+                }
+                return ConstraintKey.of(constraintType, indexName, new 
ArrayList<>());
+            });
+
+            ConstraintKey.ColumnSortType sortType = 
"A".equals(resultSet.getString("ASC_OR_DESC")) ?
+                ConstraintKey.ColumnSortType.ASC : 
ConstraintKey.ColumnSortType.DESC;
+            ConstraintKey.ConstraintKeyColumn constraintKeyColumn = new 
ConstraintKey.ConstraintKeyColumn(columnName, sortType);
+            constraintKey.getColumnNames().add(constraintKeyColumn);
+        }
+        return new ArrayList<>(constraintKeyMap.values());
+    }
+
+    protected Optional<String> getColumnDefaultValue(DatabaseMetaData 
metaData, String table, String column) throws SQLException {
+        try (ResultSet resultSet = metaData.getColumns(null, null, table, 
"MyColumn")) {
+            while (resultSet.next()) {
+                String defaultValue = resultSet.getString("COLUMN_DEF");
+                return Optional.ofNullable(defaultValue);
+            }
         }
         return Optional.empty();
     }
@@ -208,4 +255,57 @@ public abstract class AbstractJdbcCatalog implements 
Catalog {
             return false;
         }
     }
+
+    @Override
+    public void createTable(TablePath tablePath, CatalogTable table, boolean 
ignoreIfExists)
+        throws TableAlreadyExistException, DatabaseNotExistException, 
CatalogException {
+        checkNotNull(tablePath, "Table path cannot be null");
+
+        if (!databaseExists(tablePath.getDatabaseName())) {
+            throw new DatabaseNotExistException(catalogName, 
tablePath.getDatabaseName());
+        }
+        if (!createTableInternal(tablePath, table) && !ignoreIfExists) {
+            throw new TableAlreadyExistException(catalogName, tablePath);
+        }
+    }
+
+    protected abstract boolean createTableInternal(TablePath tablePath, 
CatalogTable table) throws CatalogException;
+
+    @Override
+    public void dropTable(TablePath tablePath, boolean ignoreIfNotExists) 
throws TableNotExistException, CatalogException {
+        checkNotNull(tablePath, "Table path cannot be null");
+        if (!dropTableInternal(tablePath) && !ignoreIfNotExists) {
+            throw new TableNotExistException(catalogName, tablePath);
+        }
+    }
+
+    protected abstract boolean dropTableInternal(TablePath tablePath) throws 
CatalogException;
+
+    @Override
+    public void createDatabase(TablePath tablePath, boolean ignoreIfExists) 
throws DatabaseAlreadyExistException, CatalogException {
+        checkNotNull(tablePath, "Table path cannot be null");
+        checkNotNull(tablePath.getDatabaseName(), "Database name cannot be 
null");
+
+        if (databaseExists(tablePath.getDatabaseName())) {
+            throw new DatabaseAlreadyExistException(catalogName, 
tablePath.getDatabaseName());
+        }
+        if (!createDatabaseInternal(tablePath.getDatabaseName()) && 
!ignoreIfExists) {
+            throw new DatabaseAlreadyExistException(catalogName, 
tablePath.getDatabaseName());
+        }
+    }
+
+    protected abstract boolean createDatabaseInternal(String databaseName);
+
+    @Override
+    public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists) 
throws DatabaseNotExistException, CatalogException {
+        checkNotNull(tablePath, "Table path cannot be null");
+        checkNotNull(tablePath.getDatabaseName(), "Database name cannot be 
null");
+
+        if (!dropDatabaseInternal(tablePath.getDatabaseName()) && 
!ignoreIfNotExists) {
+            throw new DatabaseNotExistException(catalogName, 
tablePath.getDatabaseName());
+        }
+    }
+
+    protected abstract boolean dropDatabaseInternal(String databaseName) 
throws CatalogException;
+
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MySqlCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MySqlCatalog.java
index 99f928afd..8683d571f 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MySqlCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MySqlCatalog.java
@@ -19,19 +19,18 @@
 package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog;
 
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
 import org.apache.seatunnel.api.table.catalog.TableIdentifier;
 import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
 import 
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
 import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
-import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.DecimalType;
-import org.apache.seatunnel.api.table.type.LocalTimeType;
-import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.common.exception.CommonErrorCode;
-import 
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.sql.MysqlCreateTableSqlBuilder;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.DataTypeUtils;
 
 import com.mysql.cj.MysqlType;
 import com.mysql.cj.jdbc.result.ResultSetImpl;
@@ -73,9 +72,8 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
 
     @Override
     public List<String> listDatabases() throws CatalogException {
-        try (Connection conn = DriverManager.getConnection(defaultUrl, 
username, pwd)) {
-
-            PreparedStatement ps = conn.prepareStatement("SHOW DATABASES;");
+        try (Connection conn = DriverManager.getConnection(defaultUrl, 
username, pwd);
+             PreparedStatement ps = conn.prepareStatement("SHOW DATABASES;")) {
 
             List<String> databases = new ArrayList<>();
             ResultSet rs = ps.executeQuery();
@@ -100,9 +98,8 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
             throw new DatabaseNotExistException(this.catalogName, 
databaseName);
         }
 
-        try (Connection conn = DriverManager.getConnection(baseUrl + 
databaseName, username, pwd)) {
-            PreparedStatement ps =
-                conn.prepareStatement("SHOW TABLES;");
+        try (Connection conn = DriverManager.getConnection(baseUrl + 
databaseName, username, pwd);
+             PreparedStatement ps = conn.prepareStatement("SHOW TABLES;")) {
 
             ResultSet rs = ps.executeQuery();
 
@@ -128,26 +125,82 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
         String dbUrl = baseUrl + tablePath.getDatabaseName();
         try (Connection conn = DriverManager.getConnection(dbUrl, username, 
pwd)) {
             DatabaseMetaData metaData = conn.getMetaData();
-            Optional<TableSchema.PrimaryKey> primaryKey =
-                getPrimaryKey(metaData, tablePath.getDatabaseName(), 
tablePath.getTableName());
+            Optional<PrimaryKey> primaryKey = getPrimaryKey(metaData, 
tablePath.getDatabaseName(), tablePath.getTableName());
+            List<ConstraintKey> constraintKeys = getConstraintKeys(metaData, 
tablePath.getDatabaseName(), tablePath.getTableName());
+
+            try (PreparedStatement ps = 
conn.prepareStatement(String.format("SELECT * FROM %s WHERE 1 = 0;", 
tablePath.getFullName()))) {
+                ResultSetMetaData tableMetaData = ps.getMetaData();
+                TableSchema.Builder builder = TableSchema.builder();
+                // add column
+                for (int i = 1; i <= tableMetaData.getColumnCount(); i++) {
+                    String columnName = tableMetaData.getColumnName(i);
+                    SeaTunnelDataType<?> type = fromJdbcType(tableMetaData, i);
+                    int columnDisplaySize = 
tableMetaData.getColumnDisplaySize(i);
+                    String comment = tableMetaData.getColumnLabel(i);
+                    boolean isNullable = tableMetaData.isNullable(i) == 
ResultSetMetaData.columnNullable;
+                    Object defaultValue = getColumnDefaultValue(metaData, 
tablePath.getTableName(), columnName).orElse(null);
+
+                    PhysicalColumn physicalColumn = 
PhysicalColumn.of(columnName, type, columnDisplaySize, isNullable, 
defaultValue, comment);
+                    builder.column(physicalColumn);
+                }
+                // add primary key
+                primaryKey.ifPresent(builder::primaryKey);
+                // add constraint key
+                constraintKeys.forEach(builder::constraintKey);
+                TableIdentifier tableIdentifier = 
TableIdentifier.of(catalogName, tablePath.getDatabaseName(), 
tablePath.getTableName());
+                return CatalogTable.of(tableIdentifier, builder.build(), 
buildConnectorOptions(tablePath), Collections.emptyList(), "");
+            }
 
-            PreparedStatement ps =
-                conn.prepareStatement(String.format("SELECT * FROM %s WHERE 1 
= 0;", tablePath.getFullName()));
+        } catch (Exception e) {
+            throw new CatalogException(String.format("Failed getting table 
%s", tablePath.getFullName()), e);
+        }
+    }
 
-            ResultSetMetaData tableMetaData = ps.getMetaData();
+    // todo: If the origin source is mysql, we can directly use create table 
like to create the target table?
+    @Override
+    protected boolean createTableInternal(TablePath tablePath, CatalogTable 
table) throws CatalogException {
+        String dbUrl = baseUrl + tablePath.getDatabaseName();
+        String createTableSql = MysqlCreateTableSqlBuilder.builder(tablePath, 
table)
+            .build();
+        try (Connection conn = DriverManager.getConnection(dbUrl, username, 
pwd);
+             PreparedStatement ps = conn.prepareStatement(createTableSql)) {
+            return ps.execute();
+        } catch (Exception e) {
+            throw new CatalogException(String.format("Failed creating table 
%s", tablePath.getFullName()), e);
+        }
+    }
 
-            TableSchema.Builder builder = TableSchema.builder();
-            for (int i = 1; i <= tableMetaData.getColumnCount(); i++) {
-                SeaTunnelDataType<?> type = fromJdbcType(tableMetaData, i);
-                builder.physicalColumn(tableMetaData.getColumnName(i), type);
-            }
+    @Override
+    protected boolean dropTableInternal(TablePath tablePath) throws 
CatalogException {
+        String dbUrl = baseUrl + tablePath.getDatabaseName();
+        try (Connection conn = DriverManager.getConnection(dbUrl, username, 
pwd);
+             PreparedStatement ps = conn.prepareStatement(String.format("DROP 
TABLE %s IF EXIST;", tablePath.getFullName()))) {
+            // Will there exist concurrent drop for one table?
+            return ps.execute();
+        } catch (SQLException e) {
+            throw new CatalogException(String.format("Failed dropping table 
%s", tablePath.getFullName()), e);
+        }
+    }
 
-            primaryKey.ifPresent(builder::primaryKey);
+    @Override
+    protected boolean createDatabaseInternal(String databaseName) throws 
CatalogException {
+        try (Connection conn = DriverManager.getConnection(defaultUrl, 
username, pwd);
+             PreparedStatement ps = 
conn.prepareStatement(String.format("CREATE DATABASE `%s`;", databaseName))) {
+            return ps.execute();
+        } catch (Exception e) {
+            throw new CatalogException(
+                String.format("Failed creating database %s in catalog %s", 
databaseName, this.catalogName), e);
+        }
+    }
 
-            TableIdentifier tableIdentifier = TableIdentifier.of(catalogName, 
tablePath.getDatabaseName(), tablePath.getTableName());
-            return CatalogTable.of(tableIdentifier, builder.build(), 
buildConnectorOptions(tablePath), Collections.emptyList(), "");
+    @Override
+    protected boolean dropDatabaseInternal(String databaseName) throws 
CatalogException {
+        try (Connection conn = DriverManager.getConnection(defaultUrl, 
username, pwd);
+             PreparedStatement ps = conn.prepareStatement(String.format("DROP 
DATABASE `%s`;", databaseName))) {
+            return ps.execute();
         } catch (Exception e) {
-            throw new CatalogException(String.format("Failed getting table 
%s", tablePath.getFullName()), e);
+            throw new CatalogException(
+                String.format("Failed dropping database %s in catalog %s", 
databaseName, this.catalogName), e);
         }
     }
 
@@ -156,67 +209,10 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
      * @see ResultSetImpl#getObjectStoredProc(int, int)
      */
     private SeaTunnelDataType<?> fromJdbcType(ResultSetMetaData metadata, int 
colIndex) throws SQLException {
+        int precision = metadata.getPrecision(colIndex);
+        int scale = metadata.getScale(colIndex);
         MysqlType mysqlType = 
MysqlType.getByName(metadata.getColumnTypeName(colIndex));
-        switch (mysqlType) {
-            case NULL:
-                return BasicType.VOID_TYPE;
-            case BOOLEAN:
-                return BasicType.BOOLEAN_TYPE;
-            case BIT:
-            case TINYINT:
-                return BasicType.BYTE_TYPE;
-            case TINYINT_UNSIGNED:
-            case SMALLINT:
-                return BasicType.SHORT_TYPE;
-            case SMALLINT_UNSIGNED:
-            case INT:
-            case MEDIUMINT:
-            case MEDIUMINT_UNSIGNED:
-                return BasicType.INT_TYPE;
-            case INT_UNSIGNED:
-            case BIGINT:
-                return BasicType.LONG_TYPE;
-            case FLOAT:
-            case FLOAT_UNSIGNED:
-                return BasicType.FLOAT_TYPE;
-            case DOUBLE:
-            case DOUBLE_UNSIGNED:
-                return BasicType.DOUBLE_TYPE;
-            case TIME:
-                return LocalTimeType.LOCAL_TIME_TYPE;
-            case DATE:
-                return LocalTimeType.LOCAL_DATE_TYPE;
-            case TIMESTAMP:
-            case DATETIME:
-                return LocalTimeType.LOCAL_DATE_TIME_TYPE;
-            // TODO: to confirm
-            case CHAR:
-            case VARCHAR:
-            case TINYTEXT:
-            case TEXT:
-            case MEDIUMTEXT:
-            case LONGTEXT:
-            case JSON:
-            case ENUM:
-                return BasicType.STRING_TYPE;
-            case BINARY:
-            case VARBINARY:
-            case TINYBLOB:
-            case BLOB:
-            case MEDIUMBLOB:
-            case LONGBLOB:
-            case GEOMETRY:
-                return PrimitiveByteArrayType.INSTANCE;
-            case BIGINT_UNSIGNED:
-            case DECIMAL:
-            case DECIMAL_UNSIGNED:
-                int precision = metadata.getPrecision(colIndex);
-                int scale = metadata.getScale(colIndex);
-                return new DecimalType(precision, scale);
-                // TODO: support 'SET' & 'YEAR' type
-            default:
-                throw new 
JdbcConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE, 
String.format("Doesn't support MySQL type '%s' yet", mysqlType.getName()));
-        }
+        return DataTypeUtils.toSeaTunnelDataType(mysqlType, precision, scale);
     }
 
     @SuppressWarnings("MagicNumber")
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/MysqlCreateTableSqlBuilder.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/MysqlCreateTableSqlBuilder.java
new file mode 100644
index 000000000..c06b93160
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/MysqlCreateTableSqlBuilder.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.sql;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.DataTypeUtils;
+
+import org.apache.commons.collections4.CollectionUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class MysqlCreateTableSqlBuilder {
+
+
+    private final String tableName;
+    private List<Column> columns;
+
+    private String comment;
+
+    private String engine;
+    private String charset;
+    private String collate;
+
+    private PrimaryKey primaryKey;
+
+    private List<ConstraintKey> constraintKeys;
+
+    private MysqlCreateTableSqlBuilder(String tableName) {
+        checkNotNull(tableName, "tableName must not be null");
+        this.tableName = tableName;
+    }
+
+    public static MysqlCreateTableSqlBuilder builder(TablePath tablePath, 
CatalogTable catalogTable) {
+        checkNotNull(tablePath, "tablePath must not be null");
+        checkNotNull(catalogTable, "catalogTable must not be null");
+
+        TableSchema tableSchema = catalogTable.getTableSchema();
+        checkNotNull(tableSchema, "tableSchema must not be null");
+
+        return new MysqlCreateTableSqlBuilder(tablePath.getTableName())
+            .comment(catalogTable.getComment())
+            // todo: set charset and collate
+            .engine(null)
+            .charset(null)
+            .primaryKey(tableSchema.getPrimaryKey())
+            .constraintKeys(tableSchema.getConstraintKeys())
+            .addColumn(tableSchema.getColumns());
+    }
+
+    public MysqlCreateTableSqlBuilder addColumn(List<Column> columns) {
+        checkArgument(CollectionUtils.isNotEmpty(columns), "columns must not 
be empty");
+        this.columns = columns;
+        return this;
+    }
+
+    public MysqlCreateTableSqlBuilder primaryKey(PrimaryKey primaryKey) {
+        this.primaryKey = primaryKey;
+        return this;
+    }
+
+    public MysqlCreateTableSqlBuilder constraintKeys(List<ConstraintKey> 
constraintKeys) {
+        this.constraintKeys = constraintKeys;
+        return this;
+    }
+
+    public MysqlCreateTableSqlBuilder engine(String engine) {
+        this.engine = engine;
+        return this;
+    }
+
+    public MysqlCreateTableSqlBuilder charset(String charset) {
+        this.charset = charset;
+        return this;
+    }
+
+    public MysqlCreateTableSqlBuilder collate(String collate) {
+        this.collate = collate;
+        return this;
+    }
+
+    public MysqlCreateTableSqlBuilder comment(String comment) {
+        this.comment = comment;
+        return this;
+    }
+
+    public String build() {
+        List<String> sqls = new ArrayList<>();
+        sqls.add(String.format("CREATE TABLE IF NOT EXISTS %s (\n%s\n)", 
tableName, buildColumnsIdentifySql()));
+        if (engine != null) {
+            sqls.add("ENGINE = " + engine);
+        }
+        if (charset != null) {
+            sqls.add("DEFAULT CHARSET = " + charset);
+        }
+        if (collate != null) {
+            sqls.add("COLLATE = " + collate);
+        }
+        if (comment != null) {
+            sqls.add("COMMENT = '" + comment + "'");
+        }
+        return String.join(" ", sqls) + ";";
+    }
+
+    private String buildColumnsIdentifySql() {
+        List<String> columnSqls = new ArrayList<>();
+        for (Column column : columns) {
+            columnSqls.add("\t" + buildColumnIdentifySql(column));
+        }
+        if (primaryKey != null) {
+            columnSqls.add("\t" + buildPrimaryKeySql());
+        }
+        if (CollectionUtils.isNotEmpty(constraintKeys)) {
+            for (ConstraintKey constraintKey : constraintKeys) {
+                columnSqls.add("\t" + buildConstraintKeySql(constraintKey));
+            }
+        }
+        return String.join(", \n", columnSqls);
+    }
+
+    private String buildColumnIdentifySql(Column column) {
+        final List<String> columnSqls = new ArrayList<>();
+        // Column name
+        columnSqls.add(column.getName());
+        // Column type
+        
columnSqls.add(DataTypeUtils.toMysqlType(column.getDataType()).getName());
+        // Column length
+        if (column.getColumnLength() != null) {
+            columnSqls.add("(" + column.getColumnLength() + ")");
+        }
+        // nullable
+        if (column.isNullable()) {
+            columnSqls.add("NULL");
+        } else {
+            columnSqls.add("NOT NULL");
+        }
+        // default value
+        if (column.getDefaultValue() != null) {
+            columnSqls.add("DEFAULT '" + column.getDefaultValue() + "'");
+        }
+        // comment
+        if (column.getComment() != null) {
+            columnSqls.add("COMMENT '" + column.getComment() + "'");
+        }
+        return String.join(" ", columnSqls);
+    }
+
+    private String buildPrimaryKeySql() {
+        String key = primaryKey.getColumnNames()
+            .stream()
+            .map(columnName -> "`" + columnName + "`")
+            .collect(Collectors.joining(", "));
+        // add sort type
+        return String.format("PRIMARY KEY (%s)", key);
+    }
+
+    private String buildConstraintKeySql(ConstraintKey constraintKey) {
+        ConstraintKey.ConstraintType constraintType = 
constraintKey.getConstraintType();
+        String indexColumns = constraintKey.getColumnNames()
+            .stream()
+            .map(constraintKeyColumn -> {
+                if (constraintKeyColumn.getSortType() == null) {
+                    return String.format("`%s`", 
constraintKeyColumn.getColumnName());
+                }
+                return String.format("`%s` %s", 
constraintKeyColumn.getColumnName(), constraintKeyColumn.getSortType().name());
+            }).collect(Collectors.joining(", "));
+        String keyName = null;
+        switch (constraintType) {
+            case KEY:
+                keyName = "KEY";
+                break;
+            case UNIQUE_KEY:
+                keyName = "UNIQUE KEY";
+                break;
+            case FOREIGN_KEY:
+                keyName = "FOREIGN KEY";
+                // todo:
+                break;
+            default:
+                throw new UnsupportedOperationException("Unsupported 
constraint type: " + constraintType);
+        }
+        return String.format("%s `%s` (%s)", keyName, 
constraintKey.getConstraintName(), indexColumns);
+    }
+
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/DataTypeUtils.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/DataTypeUtils.java
new file mode 100644
index 000000000..83af004f5
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/DataTypeUtils.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.utils;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SqlType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
+
+import com.mysql.cj.MysqlType;
+import lombok.experimental.UtilityClass;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+@UtilityClass
+public class DataTypeUtils {
+
+    public static SeaTunnelDataType<?> toSeaTunnelDataType(MysqlType 
mysqlType, int precision, int scale) {
+        switch (mysqlType) {
+            case NULL:
+                return BasicType.VOID_TYPE;
+            case BOOLEAN:
+                return BasicType.BOOLEAN_TYPE;
+            case BIT:
+            case TINYINT:
+                return BasicType.BYTE_TYPE;
+            case TINYINT_UNSIGNED:
+            case SMALLINT:
+                return BasicType.SHORT_TYPE;
+            case SMALLINT_UNSIGNED:
+            case INT:
+            case MEDIUMINT:
+            case MEDIUMINT_UNSIGNED:
+                return BasicType.INT_TYPE;
+            case INT_UNSIGNED:
+            case BIGINT:
+                return BasicType.LONG_TYPE;
+            case FLOAT:
+            case FLOAT_UNSIGNED:
+                return BasicType.FLOAT_TYPE;
+            case DOUBLE:
+            case DOUBLE_UNSIGNED:
+                return BasicType.DOUBLE_TYPE;
+            case TIME:
+                return LocalTimeType.LOCAL_TIME_TYPE;
+            case DATE:
+                return LocalTimeType.LOCAL_DATE_TYPE;
+            case TIMESTAMP:
+            case DATETIME:
+                return LocalTimeType.LOCAL_DATE_TIME_TYPE;
+            // TODO: to confirm
+            case CHAR:
+            case VARCHAR:
+            case TINYTEXT:
+            case TEXT:
+            case MEDIUMTEXT:
+            case LONGTEXT:
+            case JSON:
+            case ENUM:
+                return BasicType.STRING_TYPE;
+            case BINARY:
+            case VARBINARY:
+            case TINYBLOB:
+            case BLOB:
+            case MEDIUMBLOB:
+            case LONGBLOB:
+            case GEOMETRY:
+                return PrimitiveByteArrayType.INSTANCE;
+            case BIGINT_UNSIGNED:
+            case DECIMAL:
+            case DECIMAL_UNSIGNED:
+                return new DecimalType(precision, scale);
+            // TODO: support 'SET' & 'YEAR' type
+            default:
+                throw new 
JdbcConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE, 
String.format("Doesn't support MySQL type '%s' yet", mysqlType.getName()));
+        }
+    }
+
+    public MysqlType toMysqlType(SeaTunnelDataType<?> seaTunnelDataType) {
+        SqlType sqlType = seaTunnelDataType.getSqlType();
+        // todo: verify
+        switch (sqlType) {
+            case ARRAY:
+            case MAP:
+            case ROW:
+            case STRING:
+                return MysqlType.VARCHAR;
+            case BOOLEAN:
+                return MysqlType.BOOLEAN;
+            case TINYINT:
+                return MysqlType.TINYINT;
+            case SMALLINT:
+                return MysqlType.SMALLINT;
+            case INT:
+                return MysqlType.INT;
+            case BIGINT:
+                return MysqlType.BIGINT;
+            case FLOAT:
+                return MysqlType.FLOAT;
+            case DOUBLE:
+                return MysqlType.DOUBLE;
+            case DECIMAL:
+                return MysqlType.DECIMAL;
+            case NULL:
+                return MysqlType.NULL;
+            case BYTES:
+                return MysqlType.BIT;
+            case DATE:
+                return MysqlType.DATE;
+            case TIME:
+                return MysqlType.DATETIME;
+            case TIMESTAMP:
+                return MysqlType.TIMESTAMP;
+            default:
+                throw new 
JdbcConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE, 
String.format("Doesn't support MySQL type '%s' yet", sqlType));
+
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/MysqlCreateTableSqlBuilderTest.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/MysqlCreateTableSqlBuilderTest.java
new file mode 100644
index 000000000..dca2304a7
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/MysqlCreateTableSqlBuilderTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.sql;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+
+import com.google.common.collect.Lists;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+
+public class MysqlCreateTableSqlBuilderTest {
+
+    private static final PrintStream CONSOLE = System.out;
+
+    @Test
+    public void testBuild() {
+        // todo
+        String dataBaseName = "test_database";
+        String tableName = "test_table";
+        TablePath tablePath = TablePath.of(dataBaseName, tableName);
+        TableSchema tableSchema = TableSchema.builder()
+            .column(PhysicalColumn.of("id", BasicType.LONG_TYPE, 22, false, 
null, "id"))
+            .column(PhysicalColumn.of("name", BasicType.STRING_TYPE, 128, 
false, null, "name"))
+            .column(PhysicalColumn.of("age", BasicType.INT_TYPE, null, true, 
null, "age"))
+            .column(PhysicalColumn.of("createTime", 
LocalTimeType.LOCAL_DATE_TIME_TYPE, 3, true, null, "createTime"))
+            .column(PhysicalColumn.of("lastUpdateTime", 
LocalTimeType.LOCAL_DATE_TIME_TYPE, 3, true, null, "lastUpdateTime"))
+            .primaryKey(PrimaryKey.of("id", Lists.newArrayList("id")))
+            .constraintKey(ConstraintKey.of(ConstraintKey.ConstraintType.KEY, 
"name", Lists.newArrayList(ConstraintKey.ConstraintKeyColumn.of("name", null))))
+            .build();
+        CatalogTable catalogTable = CatalogTable.of(
+            TableIdentifier.of("test_catalog", dataBaseName, tableName),
+            tableSchema,
+            new HashMap<>(),
+            new ArrayList<>(),
+            "User table"
+        );
+
+        String createTableSql = MysqlCreateTableSqlBuilder.builder(tablePath, 
catalogTable)
+            .build();
+        String expect = "CREATE TABLE IF NOT EXISTS test_table (\n" +
+            "\tid BIGINT (22) NOT NULL COMMENT 'id', \n" +
+            "\tname VARCHAR (128) NOT NULL COMMENT 'name', \n" +
+            "\tage INT NULL COMMENT 'age', \n" +
+            "\tcreateTime TIMESTAMP (3) NULL COMMENT 'createTime', \n" +
+            "\tlastUpdateTime TIMESTAMP (3) NULL COMMENT 'lastUpdateTime', \n" 
+
+            "\tPRIMARY KEY (`id`), \n" +
+            "\tKEY `name` (`name`)\n" +
+            ") COMMENT = 'User table';";
+        CONSOLE.println(expect);
+        Assertions.assertEquals(expect, createTableSql);
+    }
+}

Reply via email to