This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/api-draft by this push:
     new 94fe1e081 [api-draft][catalog] jdbc catalog (#2042)
94fe1e081 is described below

commit 94fe1e081cf48e6095a860d431795675e871b651
Author: Zongwen Li <[email protected]>
AuthorDate: Mon Jun 27 13:58:16 2022 +0800

    [api-draft][catalog] jdbc catalog (#2042)
    
    * [api-draft][catalog] jdbc catalog
    
    # Conflicts:
    #       seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
    
    * [api-draft][catalog] Add AbstractJdbcCatalog with defaultUrl constructor
    
    Co-authored-by: Hisoka <[email protected]>
---
 pom.xml                                            |  20 +-
 .../seatunnel/api/table/catalog/Catalog.java       |  91 ++++++++
 .../seatunnel/api/table/catalog/TableSchema.java   |  94 ++++++++-
 .../exception/DatabaseNotExistException.java       |  32 +++
 .../catalog/exception/TableNotExistException.java  |  35 ++++
 seatunnel-common/pom.xml                           |   5 +
 .../seatunnel-connectors-seatunnel/pom.xml         |  11 +
 .../seatunnel-connector-seatunnel-jdbc/pom.xml     |   1 +
 .../jdbc/catalog/AbstractJdbcCatalog.java          | 211 +++++++++++++++++++
 .../seatunnel/jdbc/catalog/MySqlCatalog.java       | 229 +++++++++++++++++++++
 .../jdbc/catalog/AbstractJdbcCatalogTest.java      |  48 +++++
 11 files changed, 769 insertions(+), 8 deletions(-)

diff --git a/pom.xml b/pom.xml
index 504b8cc48..fcf707a46 100644
--- a/pom.xml
+++ b/pom.xml
@@ -153,6 +153,7 @@
         <docker.tag>${project.version}</docker.tag>
         <jcommander.version>1.81</jcommander.version>
         <junit.version>4.13.2</junit.version>
+        <junit5.version>5.8.2</junit5.version>
         <tispark.version>2.4.1</tispark.version>
         <druid.version>0.22.1</druid.version>
         <sshd.version>2.7.0</sshd.version>
@@ -514,6 +515,20 @@
                 <scope>test</scope>
             </dependency>
 
+            <dependency>
+                <groupId>org.junit.jupiter</groupId>
+                <artifactId>junit-jupiter-engine</artifactId>
+                <version>${junit5.version}</version>
+                <scope>test</scope>
+            </dependency>
+
+            <dependency>
+                <groupId>org.junit.jupiter</groupId>
+                <artifactId>junit-jupiter-params</artifactId>
+                <version>${junit5.version}</version>
+                <scope>test</scope>
+            </dependency>
+
             <dependency>
                 <groupId>com.fasterxml.jackson.core</groupId>
                 <artifactId>jackson-annotations</artifactId>
@@ -665,11 +680,6 @@
             <groupId>org.projectlombok</groupId>
             <artifactId>lombok</artifactId>
         </dependency>
-        <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-            <scope>test</scope>
-        </dependency>
         <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-api</artifactId>
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java
index 783304608..e3f83d9ff 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java
@@ -17,9 +17,100 @@
 
 package org.apache.seatunnel.api.table.catalog;
 
+import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
+import 
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
+import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
+import org.apache.seatunnel.api.table.factory.Factory;
+
+import java.util.List;
+import java.util.Optional;
+
 /**
  * Interface for reading and writing table metadata from SeaTunnel. Each 
connector need to contain
  * the implementation of Catalog.
  */
 public interface Catalog {
+
+    default Optional<Factory> getFactory() {
+        return Optional.empty();
+    }
+
+    /**
+     * Open the catalog. Used for any required preparation in initialization 
phase.
+     *
+     * @throws CatalogException in case of any runtime exception
+     */
+    void open() throws CatalogException;
+
+    /**
+     * Close the catalog when it is no longer needed and release any resource 
that it might be
+     * holding.
+     *
+     * @throws CatalogException in case of any runtime exception
+     */
+    void close() throws CatalogException;
+
+    // 
--------------------------------------------------------------------------------------------
+    // database
+    // 
--------------------------------------------------------------------------------------------
+
+    /**
+     * Get the name of the default database for this catalog. The default 
database will be the
+     * current database for the catalog when user's session doesn't specify a 
current database. The
+     * value probably comes from configuration, will not change for the life 
time of the catalog
+     * instance.
+     *
+     * @return the name of the current database
+     * @throws CatalogException in case of any runtime exception
+     */
+    String getDefaultDatabase() throws CatalogException;
+
+    /**
+     * Check if a database exists in this catalog.
+     *
+     * @param databaseName Name of the database
+     * @return true if the given database exists in the catalog false otherwise
+     * @throws CatalogException in case of any runtime exception
+     */
+    boolean databaseExists(String databaseName) throws CatalogException;
+
+    /**
+     * Get the names of all databases in this catalog.
+     *
+     * @return a list of the names of all databases
+     * @throws CatalogException in case of any runtime exception
+     */
+    List<String> listDatabases() throws CatalogException;
+
+    // 
--------------------------------------------------------------------------------------------
+    // table
+    // 
--------------------------------------------------------------------------------------------
+
+    /**
+     * Get names of all tables under this database. An empty list is returned 
if none
+     * exists.
+     *
+     * @return a list of the names of all tables in this database
+     * @throws CatalogException in case of any runtime exception
+     */
+    List<String> listTables(String databaseName) throws CatalogException, 
DatabaseNotExistException;
+
+    /**
+     * Check if a table exist in this catalog.
+     *
+     * @param tablePath Path of the table
+     * @return true if the given table exists in the catalog false otherwise
+     * @throws CatalogException in case of any runtime exception
+     */
+    boolean tableExists(TablePath tablePath) throws CatalogException;
+
+    /**
+     * Return a {@link CatalogTable}  identified by the given {@link
+     * TablePath}. The framework will resolve the metadata objects when 
necessary.
+     *
+     * @param tablePath Path of the table
+     * @return The requested table
+     * @throws CatalogException in case of any runtime exception
+     */
+    CatalogTable getTable(TablePath tablePath) throws CatalogException, 
TableNotExistException;
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java
index 04ca51698..61d99ef52 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java
@@ -17,8 +17,12 @@
 
 package org.apache.seatunnel.api.table.catalog;
 
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.List;
+import java.util.Objects;
 
 /**
  * Represent a physical table schema.
@@ -27,12 +31,15 @@ public final class TableSchema implements Serializable {
     private static final long serialVersionUID = 1L;
     private final List<Column> columns;
 
-    private TableSchema(List<Column> columns) {
+    private final PrimaryKey primaryKey;
+
+    private TableSchema(List<Column> columns, PrimaryKey primaryKey) {
         this.columns = columns;
+        this.primaryKey = primaryKey;
     }
 
-    public static TableSchema of(List<Column> columns) {
-        return new TableSchema(columns);
+    public static TableSchema.Builder builder() {
+        return new Builder();
     }
 
     /**
@@ -41,4 +48,85 @@ public final class TableSchema implements Serializable {
     public List<Column> getColumns() {
         return columns;
     }
+
+    public static final class Builder {
+        private final List<Column> columns = new ArrayList<>();
+
+        private PrimaryKey primaryKey;
+
+        public Builder columns(List<Column> columns) {
+            this.columns.addAll(columns);
+            return this;
+        }
+
+        public Builder column(Column column) {
+            this.columns.add(column);
+            return this;
+        }
+
+        public Builder physicalColumn(String name, SeaTunnelDataType<?> 
dataType) {
+            this.columns.add(Column.physical(name, dataType));
+            return this;
+        }
+
+        public Builder primaryKey(PrimaryKey primaryKey) {
+            this.primaryKey = primaryKey;
+            return this;
+        }
+
+        public Builder primaryKey(String constraintName, List<String> 
columnNames) {
+            this.primaryKey = PrimaryKey.of(constraintName, columnNames);
+            return this;
+        }
+
+        public TableSchema build() {
+            return new TableSchema(columns, primaryKey);
+        }
+    }
+
+    public static final class PrimaryKey implements Serializable {
+        private static final long serialVersionUID = 1L;
+
+        private final String constraintName;
+        private final List<String> columnNames;
+
+        private PrimaryKey(String constraintName, List<String> columnNames) {
+            this.constraintName = constraintName;
+            this.columnNames = columnNames;
+        }
+
+        public static PrimaryKey of(String constraintName, List<String> 
columnNames) {
+            return new PrimaryKey(constraintName, columnNames);
+        }
+
+        public String getConstraintName() {
+            return constraintName;
+        }
+
+        public List<String> getColumnNames() {
+            return columnNames;
+        }
+
+        @Override
+        public String toString() {
+            return String.format("CONSTRAINT %s PRIMARY KEY (%s) NOT 
ENFORCED", constraintName, String.join(", ", columnNames));
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            PrimaryKey that = (PrimaryKey) o;
+            return constraintName.equals(that.constraintName) && 
columnNames.equals(that.columnNames);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(constraintName, columnNames);
+        }
+    }
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/exception/DatabaseNotExistException.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/exception/DatabaseNotExistException.java
new file mode 100644
index 000000000..5e81505f7
--- /dev/null
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/exception/DatabaseNotExistException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.catalog.exception;
+
+/** Exception for trying to operate on a database that doesn't exist. */
+public class DatabaseNotExistException extends Exception {
+    private static final String MSG = "Database %s does not exist in Catalog 
%s.";
+
+    public DatabaseNotExistException(String catalogName, String databaseName, 
Throwable cause) {
+        super(String.format(MSG, databaseName, catalogName), cause);
+    }
+
+    public DatabaseNotExistException(String catalogName, String databaseName) {
+        this(catalogName, databaseName, null);
+    }
+}
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/exception/TableNotExistException.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/exception/TableNotExistException.java
new file mode 100644
index 000000000..d37a586eb
--- /dev/null
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/exception/TableNotExistException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.catalog.exception;
+
+import org.apache.seatunnel.api.table.catalog.TablePath;
+
+/** Exception for trying to operate on a table that doesn't exist. */
+public class TableNotExistException extends Exception {
+
+    private static final String MSG = "Table %s does not exist in Catalog %s.";
+
+    public TableNotExistException(String catalogName, TablePath tablePath) {
+        this(catalogName, tablePath, null);
+    }
+
+    public TableNotExistException(String catalogName, TablePath tablePath, 
Throwable cause) {
+        super(String.format(MSG, tablePath.getFullName(), catalogName), cause);
+    }
+}
diff --git a/seatunnel-common/pom.xml b/seatunnel-common/pom.xml
index bf801a04a..aecac47f5 100644
--- a/seatunnel-common/pom.xml
+++ b/seatunnel-common/pom.xml
@@ -55,6 +55,11 @@
             <groupId>commons-codec</groupId>
             <artifactId>commons-codec</artifactId>
         </dependency>
+
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+        </dependency>
     </dependencies>
 
 </project>
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
index 6b94005e3..4c28aa789 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
@@ -43,4 +43,15 @@
         <module>seatunnel-connector-seatunnel-hive</module>
     </modules>
 
+    <dependencies>
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-engine</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-params</artifactId>
+        </dependency>
+    </dependencies>
+
 </project>
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/pom.xml
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/pom.xml
index d5c9ad279..28dd4cb5f 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/pom.xml
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/pom.xml
@@ -40,6 +40,7 @@
         <dependency>
             <groupId>mysql</groupId>
             <artifactId>mysql-connector-java</artifactId>
+            <scope>provided</scope>
         </dependency>
 
         <dependency>
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
new file mode 100644
index 000000000..852fbbe0a
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
+import 
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+public abstract class AbstractJdbcCatalog implements Catalog {
+    private static final Logger LOG = 
LoggerFactory.getLogger(AbstractJdbcCatalog.class);
+
+    protected final String catalogName;
+    protected final String defaultDatabase;
+    protected final String username;
+    protected final String pwd;
+    protected final String baseUrl;
+    protected final String defaultUrl;
+
+    public AbstractJdbcCatalog(
+        String catalogName,
+        String defaultDatabase,
+        String username,
+        String pwd,
+        String baseUrl) {
+
+        checkArgument(StringUtils.isNotBlank(username));
+        checkArgument(StringUtils.isNotBlank(pwd));
+        checkArgument(StringUtils.isNotBlank(baseUrl));
+
+        baseUrl = baseUrl.trim();
+        validateJdbcUrlWithoutDatabase(baseUrl);
+        this.catalogName = catalogName;
+        this.defaultDatabase = defaultDatabase;
+        this.username = username;
+        this.pwd = pwd;
+        this.baseUrl = baseUrl.endsWith("/") ? baseUrl : baseUrl + "/";
+        this.defaultUrl = this.baseUrl + defaultDatabase;
+    }
+
+    /**
+     * URL has to be without database, like "jdbc:mysql://localhost:5432/" or
+     * "jdbc:mysql://localhost:5432" rather than 
"jdbc:mysql://localhost:5432/db".
+     */
+    public static void validateJdbcUrlWithoutDatabase(String url) {
+        String[] parts = url.trim().split("\\/+");
+
+        checkArgument(parts.length == 2);
+    }
+
+    public AbstractJdbcCatalog(
+        String catalogName,
+        String username,
+        String pwd,
+        String defaultUrl) {
+
+        checkArgument(StringUtils.isNotBlank(username));
+        checkArgument(StringUtils.isNotBlank(pwd));
+        checkArgument(StringUtils.isNotBlank(defaultUrl));
+
+        defaultUrl = defaultUrl.trim();
+        validateJdbcUrlWithDatabase(defaultUrl);
+        this.catalogName = catalogName;
+        this.username = username;
+        this.pwd = pwd;
+        this.defaultUrl = defaultUrl;
+        String[] strings = splitDefaultUrl(defaultUrl);
+        this.baseUrl = strings[0];
+        this.defaultDatabase = strings[1];
+    }
+
+    /**
+     * URL has to be with database, like "jdbc:mysql://localhost:5432/db" 
rather than "jdbc:mysql://localhost:5432/".
+     */
+    @SuppressWarnings("MagicNumber")
+    public static void validateJdbcUrlWithDatabase(String url) {
+        String[] parts = url.trim().split("\\/+");
+        checkArgument(parts.length == 3);
+    }
+
+    /**
+     * Ensure that the url was validated {@link #validateJdbcUrlWithDatabase}.
+     *
+     * @return The array size is fixed at 2, index 0 is base url, and index 1 
is default database.
+     */
+    public static String[] splitDefaultUrl(String defaultUrl) {
+        String[] res = new String[2];
+        int index = defaultUrl.lastIndexOf("/")  + 1;
+        res[0] = defaultUrl.substring(0, index);
+        res[1] = defaultUrl.substring(index, defaultUrl.length());
+        return res;
+    }
+
+    @Override
+    public String getDefaultDatabase() {
+        return defaultDatabase;
+    }
+
+    public String getCatalogName() {
+        return catalogName;
+    }
+
+    public String getUsername() {
+        return username;
+    }
+
+    public String getPassword() {
+        return pwd;
+    }
+
+    public String getBaseUrl() {
+        return baseUrl;
+    }
+
+    @Override
+    public void open() throws CatalogException {
+        try (Connection conn = DriverManager.getConnection(defaultUrl, 
username, pwd)) {
+            // test connection, fail early if we cannot connect to database
+        } catch (SQLException e) {
+            throw new CatalogException(
+                String.format("Failed connecting to %s via JDBC.", 
defaultUrl), e);
+        }
+
+        LOG.info("Catalog {} established connection to {}", catalogName, 
defaultUrl);
+    }
+
+    @Override
+    public void close() throws CatalogException {
+        LOG.info("Catalog {} closing", catalogName);
+    }
+
+    protected Optional<TableSchema.PrimaryKey> getPrimaryKey(
+        DatabaseMetaData metaData, String schema, String table) throws 
SQLException {
+
+        // According to the Javadoc of 
java.sql.DatabaseMetaData#getPrimaryKeys,
+        // the returned primary key columns are ordered by COLUMN_NAME, not by 
KEY_SEQ.
+        // We need to sort them based on the KEY_SEQ value.
+        ResultSet rs = metaData.getPrimaryKeys(null, schema, table);
+
+        Map<Integer, String> keySeqColumnName = new HashMap<>();
+        String pkName = null;
+        while (rs.next()) {
+            String columnName = rs.getString("COLUMN_NAME");
+            // all the PK_NAME should be the same
+            pkName = rs.getString("PK_NAME");
+            int keySeq = rs.getInt("KEY_SEQ");
+            // KEY_SEQ is 1-based index
+            keySeqColumnName.put(keySeq - 1, columnName);
+        }
+        // initialize size
+        List<String> pkFields = Arrays.asList(new 
String[keySeqColumnName.size()]);
+        keySeqColumnName.forEach(pkFields::set);
+        if (!pkFields.isEmpty()) {
+            // PK_NAME maybe null according to the javadoc, generate an unique 
name in that case
+            pkName = pkName == null ? "pk_" + String.join("_", pkFields) : 
pkName;
+            return Optional.of(TableSchema.PrimaryKey.of(pkName, pkFields));
+        }
+        return Optional.empty();
+    }
+
+    @Override
+    public boolean databaseExists(String databaseName) throws CatalogException 
{
+        checkArgument(StringUtils.isNotBlank(databaseName));
+
+        return listDatabases().contains(databaseName);
+    }
+
+    @Override
+    public boolean tableExists(TablePath tablePath) throws CatalogException {
+        try {
+            return databaseExists(tablePath.getDatabaseName())
+                && 
listTables(tablePath.getDatabaseName()).contains(tablePath.getTableName());
+        } catch (DatabaseNotExistException e) {
+            return false;
+        }
+    }
+}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MySqlCatalog.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MySqlCatalog.java
new file mode 100644
index 000000000..69638d042
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MySqlCatalog.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
+import 
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
+import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.PrimitiveArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+
+import com.mysql.cj.MysqlType;
+import com.mysql.cj.jdbc.result.ResultSetImpl;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+public class MySqlCatalog extends AbstractJdbcCatalog {
+
+    private static final Set<String> SYS_DATABASES = new HashSet<>(4);
+
+    static {
+        SYS_DATABASES.add("information_schema");
+        SYS_DATABASES.add("mysql");
+        SYS_DATABASES.add("performance_schema");
+        SYS_DATABASES.add("sys");
+    }
+
+    public MySqlCatalog(String catalogName, String defaultDatabase, String 
username, String pwd, String baseUrl) {
+        super(catalogName, defaultDatabase, username, pwd, baseUrl);
+    }
+
+    public MySqlCatalog(String catalogName, String username, String pwd, 
String defaultUrl) {
+        super(catalogName, username, pwd, defaultUrl);
+    }
+
+    @Override
+    public List<String> listDatabases() throws CatalogException {
+        try (Connection conn = DriverManager.getConnection(defaultUrl, 
username, pwd)) {
+
+            PreparedStatement ps = conn.prepareStatement("SHOW DATABASES;");
+
+            List<String> databases = new ArrayList<>();
+            ResultSet rs = ps.executeQuery();
+
+            while (rs.next()) {
+                String databaseName = rs.getString(1);
+                if (!SYS_DATABASES.contains(databaseName)) {
+                    databases.add(rs.getString(1));
+                }
+            }
+
+            return databases;
+        } catch (Exception e) {
+            throw new CatalogException(
+                String.format("Failed listing database in catalog %s", 
this.catalogName), e);
+        }
+    }
+
+    @Override
+    public List<String> listTables(String databaseName) throws 
CatalogException, DatabaseNotExistException {
+        if (!databaseExists(databaseName)) {
+            throw new DatabaseNotExistException(this.catalogName, 
databaseName);
+        }
+
+        try (Connection conn = DriverManager.getConnection(baseUrl + 
databaseName, username, pwd)) {
+            PreparedStatement ps =
+                conn.prepareStatement("SHOW TABLES;");
+
+            ResultSet rs = ps.executeQuery();
+
+            List<String> tables = new ArrayList<>();
+
+            while (rs.next()) {
+                tables.add(rs.getString(1));
+            }
+
+            return tables;
+        } catch (Exception e) {
+            throw new CatalogException(
+                String.format("Failed listing database in catalog %s", 
catalogName), e);
+        }
+    }
+
+    @Override
+    public CatalogTable getTable(TablePath tablePath) throws CatalogException, 
TableNotExistException {
+        if (!tableExists(tablePath)) {
+            throw new TableNotExistException(catalogName, tablePath);
+        }
+
+        String dbUrl = baseUrl + tablePath.getDatabaseName();
+        try (Connection conn = DriverManager.getConnection(dbUrl, username, 
pwd)) {
+            DatabaseMetaData metaData = conn.getMetaData();
+            Optional<TableSchema.PrimaryKey> primaryKey =
+                getPrimaryKey(metaData, tablePath.getDatabaseName(), 
tablePath.getTableName());
+
+            PreparedStatement ps =
+                conn.prepareStatement(String.format("SELECT * FROM %s LIMIT 
1;", tablePath.getFullName()));
+
+            ResultSetMetaData tableMetaData = ps.getMetaData();
+
+            TableSchema.Builder builder = TableSchema.builder();
+            for (int i = 1; i <= tableMetaData.getColumnCount(); i++) {
+                SeaTunnelDataType<?> type = fromJdbcType(tableMetaData, i);
+                builder.physicalColumn(tableMetaData.getColumnName(i), type);
+            }
+
+            primaryKey.ifPresent(builder::primaryKey);
+
+            TableIdentifier tableIdentifier = TableIdentifier.of(catalogName, 
tablePath.getDatabaseName(), tablePath.getTableName());
+            return CatalogTable.of(tableIdentifier, builder.build(), 
buildConnectorOptions(tablePath), Collections.emptyList(), "");
+        } catch (Exception e) {
+            throw new CatalogException(String.format("Failed getting table 
%s", tablePath.getFullName()), e);
+        }
+    }
+
+    /**
+     * @see com.mysql.cj.MysqlType
+     * @see ResultSetImpl#getObjectStoredProc(int, int)
+     */
+    private SeaTunnelDataType<?> fromJdbcType(ResultSetMetaData metadata, int 
colIndex) throws SQLException {
+        MysqlType mysqlType = 
MysqlType.getByName(metadata.getColumnTypeName(colIndex));
+        switch (mysqlType) {
+            case NULL:
+                return BasicType.VOID_TYPE;
+            case BOOLEAN:
+                return BasicType.BOOLEAN_TYPE;
+            case BIT:
+            case TINYINT:
+                return BasicType.BYTE_TYPE;
+            case TINYINT_UNSIGNED:
+            case SMALLINT:
+                return BasicType.SHORT_TYPE;
+            case SMALLINT_UNSIGNED:
+            case INT:
+            case MEDIUMINT:
+            case MEDIUMINT_UNSIGNED:
+                return BasicType.INT_TYPE;
+            case INT_UNSIGNED:
+            case BIGINT:
+                return BasicType.LONG_TYPE;
+            case FLOAT:
+            case FLOAT_UNSIGNED:
+                return BasicType.FLOAT_TYPE;
+            case DOUBLE:
+            case DOUBLE_UNSIGNED:
+                return BasicType.DOUBLE_TYPE;
+            case TIME:
+                return LocalTimeType.LOCAL_TIME_TYPE;
+            case DATE:
+                return LocalTimeType.LOCAL_DATE_TYPE;
+            case TIMESTAMP:
+            case DATETIME:
+                return LocalTimeType.LOCAL_DATE_TIME_TYPE;
+            // TODO: to confirm
+            case CHAR:
+            case VARCHAR:
+            case TINYTEXT:
+            case TEXT:
+            case MEDIUMTEXT:
+            case LONGTEXT:
+            case JSON:
+            case ENUM:
+                return BasicType.STRING_TYPE;
+            case BINARY:
+            case VARBINARY:
+            case TINYBLOB:
+            case BLOB:
+            case MEDIUMBLOB:
+            case LONGBLOB:
+            case GEOMETRY:
+                return PrimitiveArrayType.PRIMITIVE_BYTE_ARRAY_TYPE;
+            case BIGINT_UNSIGNED:
+            case DECIMAL:
+            case DECIMAL_UNSIGNED:
+                int precision = metadata.getPrecision(colIndex);
+                int scale = metadata.getScale(colIndex);
+                return BasicType.BIG_DECIMAL_TYPE;
+                // TODO: support 'SET' & 'YEAR' type
+            default:
+                throw new UnsupportedOperationException(String.format("Doesn't 
support MySQL type '%s' yet", mysqlType.getName()));
+        }
+    }
+
+    @SuppressWarnings("MagicNumber")
+    private Map<String, String> buildConnectorOptions(TablePath tablePath) {
+        Map<String, String> options = new HashMap<>(8);
+        options.put("connector", "jdbc");
+        options.put("url", baseUrl + tablePath.getDatabaseName());
+        options.put("table-name", tablePath.getFullName());
+        options.put("username", username);
+        options.put("password", pwd);
+        return options;
+    }
+}
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalogTest.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalogTest.java
new file mode 100644
index 000000000..251c24d21
--- /dev/null
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalogTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.ValueSource;
+
+public class AbstractJdbcCatalogTest {
+
+    @ParameterizedTest
+    @ValueSource(strings = {"jdbc:mysql://localhost:5432/", 
"jdbc:mysql://localhost:5432", "jdbc:mysql://localhost:5432/", 
"jdbc:postgresql://localhost:5432"})
+    public void testValidateJdbcBaseUrl(String baseUrl) {
+        Assertions.assertDoesNotThrow(() -> 
AbstractJdbcCatalog.validateJdbcUrlWithoutDatabase(baseUrl));
+        Assertions.assertThrowsExactly(IllegalArgumentException.class, () -> 
AbstractJdbcCatalog.validateJdbcUrlWithDatabase(baseUrl));
+    }
+
+    @ParameterizedTest
+    @ValueSource(strings = {"jdbc:mysql://localhost:5432/db", 
"jdbc:postgresql://localhost:5432/db"})
+    public void testValidateJdbcDefault(String defaultUrl) {
+        Assertions.assertDoesNotThrow(() -> 
AbstractJdbcCatalog.validateJdbcUrlWithDatabase(defaultUrl));
+        Assertions.assertThrowsExactly(IllegalArgumentException.class, () -> 
AbstractJdbcCatalog.validateJdbcUrlWithoutDatabase(defaultUrl));
+    }
+
+    @ParameterizedTest
+    @CsvSource({"jdbc:mysql://localhost:5432/db, jdbc:mysql://localhost:5432/, 
db",
+        "jdbc:postgresql://localhost:5432/db, 
jdbc:postgresql://localhost:5432/, db"})
+    public void testSplitDefaultUrl(String defaultUrl, String expectedUrl, 
String expectedDatabase) {
+        Assertions.assertArrayEquals(new String[] {expectedUrl, 
expectedDatabase}, AbstractJdbcCatalog.splitDefaultUrl(defaultUrl));
+    }
+}

Reply via email to