Airblader commented on a change in pull request #16962:
URL: https://github.com/apache/flink/pull/16962#discussion_r711951494



##########
File path: docs/content/docs/connectors/table/jdbc.md
##########
@@ -308,47 +308,51 @@ As there is no standard syntax for upsert, the following 
table describes the dat
     </tbody>
 </table>
 
-### Postgres Database as a Catalog
+JDBC Catalog
+------------
 
 The `JdbcCatalog` enables users to connect Flink to relational databases over 
JDBC protocol.
 
-Currently, `PostgresCatalog` is the only implementation of JDBC Catalog at the 
moment, `PostgresCatalog` only supports limited `Catalog` methods include:
+Currently, `PostgresCatalog` and `MySQLCatalog` are the two implementations of 
JDBC Catalog at the moment, `JdbcCatalog` only supports limited `Catalog` 
methods include:

Review comment:
       This sentence is a bit confusing. How about this?
   
   ```suggestion
   Currently, there are two JDBC catalog implementations, `PostgresCatalog` and 
`MySQLCatalog`. They support the following catalog methods. Other methods are 
currently not supported.
   ```

##########
File path: docs/content/docs/connectors/table/jdbc.md
##########
@@ -458,6 +464,38 @@ SELECT * FROM mydb.`custom_schema.test_table2`;
 SELECT * FROM `custom_schema.test_table2`;
 ```
 
+### Jdbc Catalog for MySQL
+
+#### MySQL Metaspace Mapping

Review comment:
       Is this entire section really useful? For Postgres it exists because the 
mapping isn't trivial, but for MySQL it behaves exactly as expected. This is a 
lot of text for saying just that, and e.g. being able to use 
`database.table_name` or `table_name` is not specific to the MySQLCatalog, but 
just how Flink SQL works.

##########
File path: docs/content/docs/connectors/table/jdbc.md
##########
@@ -428,12 +432,14 @@ catalogs:
 {{< /tab >}}
 {{< /tabs >}}
 
+### Jdbc Catalog for PostgreSQL

Review comment:
       ```suggestion
   ### JDBC Catalog for PostgreSQL
   ```

##########
File path: docs/content/docs/connectors/table/jdbc.md
##########
@@ -308,47 +308,51 @@ As there is no standard syntax for upsert, the following 
table describes the dat
     </tbody>
 </table>
 
-### Postgres Database as a Catalog
+JDBC Catalog
+------------
 
 The `JdbcCatalog` enables users to connect Flink to relational databases over 
JDBC protocol.
 
-Currently, `PostgresCatalog` is the only implementation of JDBC Catalog at the 
moment, `PostgresCatalog` only supports limited `Catalog` methods include:
+Currently, `PostgresCatalog` and `MySQLCatalog` are the two implementations of 
JDBC Catalog at the moment, `JdbcCatalog` only supports limited `Catalog` 
methods include:
 
 ```java
-// The supported methods by Postgres Catalog.
-PostgresCatalog.databaseExists(String databaseName)
-PostgresCatalog.listDatabases()
-PostgresCatalog.getDatabase(String databaseName)
-PostgresCatalog.listTables(String databaseName)
-PostgresCatalog.getTable(ObjectPath tablePath)
-PostgresCatalog.tableExists(ObjectPath tablePath)
+// The supported methods by Postgres & MySQL Catalog.
+databaseExists(String databaseName);
+listDatabases();
+getDatabase(String databaseName);
+listTables(String databaseName);
+getTable(ObjectPath tablePath);
+tableExists(ObjectPath tablePath);
 ```
 
 Other `Catalog` methods is unsupported now.
 
-#### Usage of PostgresCatalog
+### Usage of JDBC Catalog
 
-Please refer to [Dependencies](#dependencies) section for how to setup a JDBC 
connector and Postgres driver.
+The section mainly describes how to create and use a `PostgresCatalog` or 
`MySQLCatalog`.
+Please refer to [Dependencies](#dependencies) section for how to setup a JDBC 
connector and Postgres/MySQL driver.

Review comment:
       It would be good to avoid explicitly listing all supported 
implementations where possible, e.g.
   
   ```suggestion
   Please refer to [Dependencies](#dependencies) section for how to setup a 
JDBC connector and the corresponding driver.
   ```

##########
File path: docs/content/docs/connectors/table/jdbc.md
##########
@@ -308,47 +308,51 @@ As there is no standard syntax for upsert, the following 
table describes the dat
     </tbody>
 </table>
 
-### Postgres Database as a Catalog
+JDBC Catalog
+------------
 
 The `JdbcCatalog` enables users to connect Flink to relational databases over 
JDBC protocol.
 
-Currently, `PostgresCatalog` is the only implementation of JDBC Catalog at the 
moment, `PostgresCatalog` only supports limited `Catalog` methods include:
+Currently, `PostgresCatalog` and `MySQLCatalog` are the two implementations of 
JDBC Catalog at the moment, `JdbcCatalog` only supports limited `Catalog` 
methods include:
 
 ```java
-// The supported methods by Postgres Catalog.
-PostgresCatalog.databaseExists(String databaseName)
-PostgresCatalog.listDatabases()
-PostgresCatalog.getDatabase(String databaseName)
-PostgresCatalog.listTables(String databaseName)
-PostgresCatalog.getTable(ObjectPath tablePath)
-PostgresCatalog.tableExists(ObjectPath tablePath)
+// The supported methods by Postgres & MySQL Catalog.
+databaseExists(String databaseName);
+listDatabases();
+getDatabase(String databaseName);
+listTables(String databaseName);
+getTable(ObjectPath tablePath);
+tableExists(ObjectPath tablePath);
 ```
 
 Other `Catalog` methods is unsupported now.

Review comment:
       ```suggestion
   ```
   
   (moved to the paragraph above in my suggestion)

##########
File path: docs/content/docs/connectors/table/jdbc.md
##########
@@ -308,47 +308,51 @@ As there is no standard syntax for upsert, the following 
table describes the dat
     </tbody>
 </table>
 
-### Postgres Database as a Catalog
+JDBC Catalog
+------------
 
 The `JdbcCatalog` enables users to connect Flink to relational databases over 
JDBC protocol.
 
-Currently, `PostgresCatalog` is the only implementation of JDBC Catalog at the 
moment, `PostgresCatalog` only supports limited `Catalog` methods include:
+Currently, `PostgresCatalog` and `MySQLCatalog` are the two implementations of 
JDBC Catalog at the moment, `JdbcCatalog` only supports limited `Catalog` 
methods include:
 
 ```java
-// The supported methods by Postgres Catalog.
-PostgresCatalog.databaseExists(String databaseName)
-PostgresCatalog.listDatabases()
-PostgresCatalog.getDatabase(String databaseName)
-PostgresCatalog.listTables(String databaseName)
-PostgresCatalog.getTable(ObjectPath tablePath)
-PostgresCatalog.tableExists(ObjectPath tablePath)
+// The supported methods by Postgres & MySQL Catalog.
+databaseExists(String databaseName);
+listDatabases();
+getDatabase(String databaseName);
+listTables(String databaseName);
+getTable(ObjectPath tablePath);
+tableExists(ObjectPath tablePath);
 ```
 
 Other `Catalog` methods is unsupported now.
 
-#### Usage of PostgresCatalog
+### Usage of JDBC Catalog
 
-Please refer to [Dependencies](#dependencies) section for how to setup a JDBC 
connector and Postgres driver.
+The section mainly describes how to create and use a `PostgresCatalog` or 
`MySQLCatalog`.
+Please refer to [Dependencies](#dependencies) section for how to setup a JDBC 
connector and Postgres/MySQL driver.
 
-Postgres catalog supports the following options:
+Postgres/MySQL catalog supports the following options:
 - `name`: required, name of the catalog.
 - `default-database`: required, default database to connect to.
-- `username`: required, username of Postgres account.
+- `username`: required, username of Postgres/MySQL account.
 - `password`: required, password of the account.
-- `base-url`: required, should be of format `"jdbc:postgresql://<ip>:<port>"`, 
and should not contain database name here.
+- `base-url`: required,
+  - `in PostgresCatalog case:` should be of format 
`"jdbc:postgresql://<ip>:<port>"`, and should not contain database name here.
+  - `in MySQLCatalog case:` should be of format `"jdbc:mysql://<ip>:<port>"`, 
and should not contain database name here.

Review comment:
       ```suggestion
   - `base-url`: required (should not contain the database name)
     - for `PostgresCatalog` this should be `"jdbc:postgresql://<ip>:<port>"`
     - for `MySQLCatalog` this should be `"jdbc:mysql://<ip>:<port>"`
   ```

##########
File path: docs/content/docs/connectors/table/jdbc.md
##########
@@ -458,6 +464,38 @@ SELECT * FROM mydb.`custom_schema.test_table2`;
 SELECT * FROM `custom_schema.test_table2`;
 ```
 
+### Jdbc Catalog for MySQL
+
+#### MySQL Metaspace Mapping
+
+The databases in a `MySQL` instance are at the same mapping level as the 
databases under the catalog registered with `MySQLCatalog`. A MySQL instance 
can have multiple databases, each database can have multiple tables.
+In Flink, when querying tables registered by MySQL catalog, users can use 
either `database.table_name` or just `table_name`. The default value is the 
default database specified when `MySQLCatalog` was created.
+
+Therefore, the metaspace mapping between Flink Catalog and MySQLCatalog is as 
following:
+
+| Flink Catalog Metaspace Structure    |   MySQL Metaspace Structure         |
+| :------------------------------------| :-----------------------------------|
+| catalog name (defined in Flink only) | N/A                                 |
+| database name                        | database name                       |
+| table name                           | table_name                          |
+
+The full path of MySQL table in Flink should be 
``"`<catalog>`.`<db>`.`<table>`"``.
+
+Here are some examples to access MySQL tables:
+
+```sql
+-- scan table 'test_table', the default database is 'mydb'.
+SELECT * FROM mysql_catalog.mydb.test_table;
+SELECT * FROM mydb.test_table;
+SELECT * FROM test_table;
+
+-- scan table 'test_table' with the given database.
+SELECT * FROM mysql_catalog.given_database.test_table2

Review comment:
       ```suggestion
   SELECT * FROM mysql_catalog.given_database.test_table2;
   ```

##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/MySQLCatalog.java
##########
@@ -0,0 +1,499 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.catalog;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.constraints.UniqueConstraint;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.compress.utils.Lists;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static 
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.PASSWORD;
+import static 
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.TABLE_NAME;
+import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.URL;
+import static 
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.USERNAME;
+import static 
org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory.IDENTIFIER;
+import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
+
+/** Catalog for MySQL. */
+public class MySQLCatalog extends AbstractJdbcCatalog {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(MySQLCatalog.class);
+
+    private final String databaseVersion;
+    private final String driverVersion;
+
+    // ============================data types=====================
+
+    public static final String MYSQL_UNKNOWN = "UNKNOWN";
+    public static final String MYSQL_BIT = "BIT";
+
+    // -------------------------number----------------------------
+    public static final String MYSQL_TINYINT = "TINYINT";
+    public static final String MYSQL_TINYINT_UNSIGNED = "TINYINT UNSIGNED";
+    public static final String MYSQL_SMALLINT = "SMALLINT";
+    public static final String MYSQL_SMALLINT_UNSIGNED = "SMALLINT UNSIGNED";
+    public static final String MYSQL_MEDIUMINT = "MEDIUMINT";
+    public static final String MYSQL_MEDIUMINT_UNSIGNED = "MEDIUMINT UNSIGNED";
+    public static final String MYSQL_INT = "INT";
+    public static final String MYSQL_INT_UNSIGNED = "INT UNSIGNED";
+    public static final String MYSQL_INTEGER = "INTEGER";
+    public static final String MYSQL_INTEGER_UNSIGNED = "INTEGER UNSIGNED";
+    public static final String MYSQL_BIGINT = "BIGINT";
+    public static final String MYSQL_BIGINT_UNSIGNED = "BIGINT UNSIGNED";
+    public static final String MYSQL_DECIMAL = "DECIMAL";
+    public static final String MYSQL_DECIMAL_UNSIGNED = "DECIMAL UNSIGNED";
+    public static final String MYSQL_FLOAT = "FLOAT";
+    public static final String MYSQL_FLOAT_UNSIGNED = "FLOAT UNSIGNED";
+    public static final String MYSQL_DOUBLE = "DOUBLE";
+    public static final String MYSQL_DOUBLE_UNSIGNED = "DOUBLE UNSIGNED";
+
+    // -------------------------string----------------------------
+    public static final String MYSQL_CHAR = "CHAR";
+    public static final String MYSQL_VARCHAR = "VARCHAR";
+    public static final String MYSQL_TINYTEXT = "TINYTEXT";
+    public static final String MYSQL_MEDIUMTEXT = "MEDIUMTEXT";
+    public static final String MYSQL_TEXT = "TEXT";
+    public static final String MYSQL_LONGTEXT = "LONGTEXT";
+    public static final String MYSQL_JSON = "JSON";
+
+    // ------------------------------time-------------------------
+    public static final String MYSQL_DATE = "DATE";
+    public static final String MYSQL_DATETIME = "DATETIME";
+    public static final String MYSQL_TIME = "TIME";
+    public static final String MYSQL_TIMESTAMP = "TIMESTAMP";
+    public static final String MYSQL_YEAR = "YEAR";
+
+    // ------------------------------blob-------------------------
+    public static final String MYSQL_TINYBLOB = "TINYBLOB";
+    public static final String MYSQL_MEDIUMBLOB = "MEDIUMBLOB";
+    public static final String MYSQL_BLOB = "BLOB";
+    public static final String MYSQL_LONGBLOB = "LONGBLOB";
+    public static final String MYSQL_BINARY = "BINARY";
+    public static final String MYSQL_VARBINARY = "VARBINARY";
+    public static final String MYSQL_GEOMETRY = "GEOMETRY";
+
+    // column class names
+    public static final String COLUMN_CLASS_BOOLEAN = "java.lang.Boolean";
+    public static final String COLUMN_CLASS_INTEGER = "java.lang.Integer";
+    public static final String COLUMN_CLASS_BIG_INTEGER = 
"java.math.BigInteger";
+    public static final String COLUMN_CLASS_LONG = "java.lang.Long";
+    public static final String COLUMN_CLASS_FLOAT = "java.lang.Float";
+    public static final String COLUMN_CLASS_DOUBLE = "java.lang.Double";
+    public static final String COLUMN_CLASS_BIG_DECIMAL = 
"java.math.BigDecimal";
+    public static final String COLUMN_CLASS_BYTE_ARRAY = "[B";
+    public static final String COLUMN_CLASS_STRING = "java.lang.String";
+    public static final String COLUMN_CLASS_DATE = "java.sql.Date";
+    public static final String COLUMN_CLASS_TIME = "java.sql.Time";
+    public static final String COLUMN_CLASS_TIMESTAMP = "java.sql.Timestamp";
+
+    public static final int RAW_TIME_LENGTH = 10;
+    public static final int RAW_TIMESTAMP_LENGTH = 19;
+
+    private static final Set<String> builtinDatabases =
+            new HashSet<String>() {
+                {
+                    add("information_schema");
+                    add("mysql");
+                    add("performance_schema");
+                    add("sys");
+                }
+            };
+
+    public MySQLCatalog(
+            String catalogName,
+            String defaultDatabase,
+            String username,
+            String pwd,
+            String baseUrl) {
+        super(catalogName, defaultDatabase, username, pwd, baseUrl);
+        this.driverVersion =
+                Preconditions.checkNotNull(getDriverVersion(), "driver version 
must not be null.");
+        this.databaseVersion =
+                Preconditions.checkNotNull(
+                        getDatabaseVersion(), "database version must not be 
null.");
+        LOG.info("Driver version: {}, database version: {}", driverVersion, 
databaseVersion);
+    }
+
+    @Override
+    public List<String> listDatabases() throws CatalogException {
+        String sql = "SELECT `SCHEMA_NAME` FROM 
`INFORMATION_SCHEMA`.`SCHEMATA`;";
+        return extractColumnValuesBySQL(
+                defaultUrl,
+                sql,
+                1,
+                (FilterFunction<String>) dbName -> 
!builtinDatabases.contains(dbName));
+    }
+
+    @Override
+    public CatalogDatabase getDatabase(String databaseName)
+            throws DatabaseNotExistException, CatalogException {
+        Preconditions.checkState(
+                StringUtils.isNotBlank(databaseName), "Database name must not 
be blank.");
+        if (listDatabases().contains(databaseName)) {
+            return new CatalogDatabaseImpl(Collections.emptyMap(), null);
+        } else {
+            throw new DatabaseNotExistException(getName(), databaseName);
+        }
+    }
+
+    @Override
+    public List<String> listTables(String databaseName)
+            throws DatabaseNotExistException, CatalogException {
+        Preconditions.checkState(
+                StringUtils.isNotBlank(databaseName), "Database name must not 
be blank.");
+        if (!databaseExists(databaseName)) {
+            throw new DatabaseNotExistException(getName(), databaseName);
+        }
+        String connUrl = baseUrl + databaseName;
+        String sql =
+                String.format(
+                        "SELECT TABLE_NAME FROM information_schema.`TABLES` 
WHERE TABLE_SCHEMA = '%s'",
+                        databaseName);
+        return extractColumnValuesBySQL(connUrl, sql, 1, null);
+    }
+
+    // ------ retrieve PK constraint ------
+
+    private Optional<UniqueConstraint> getPrimaryKey(
+            DatabaseMetaData metaData, String schema, ObjectPath table) throws 
SQLException {
+
+        // According to the Javadoc of 
java.sql.DatabaseMetaData#getPrimaryKeys,
+        // the returned primary key columns are ordered by COLUMN_NAME, not by 
KEY_SEQ.
+        // We need to sort them based on the KEY_SEQ value.
+        ResultSet rs =
+                metaData.getPrimaryKeys(table.getDatabaseName(), schema, 
table.getObjectName());
+
+        Map<Integer, String> keySeqColumnName = new HashMap<>();
+        String pkName = null;
+        while (rs.next()) {
+            String columnName = rs.getString("COLUMN_NAME");
+            pkName = rs.getString("PK_NAME"); // all the PK_NAME should be the 
same
+            int keySeq = rs.getInt("KEY_SEQ");
+            keySeqColumnName.put(keySeq - 1, columnName); // KEY_SEQ is 
1-based index
+        }
+        List<String> pkFields =
+                Arrays.asList(new String[keySeqColumnName.size()]); // 
initialize size
+        keySeqColumnName.forEach(pkFields::set);
+        if (!pkFields.isEmpty()) {
+            // PK_NAME maybe null according to the javadoc, generate an unique 
name in that case
+            pkName = pkName == null ? "pk_" + String.join("_", pkFields) : 
pkName;
+            return Optional.of(UniqueConstraint.primaryKey(pkName, pkFields));
+        }
+        return Optional.empty();
+    }
+
+    @Override
+    public CatalogBaseTable getTable(ObjectPath tablePath)
+            throws TableNotExistException, CatalogException {
+        if (!tableExists(tablePath)) {
+            throw new TableNotExistException(getName(), tablePath);
+        }
+        try (Connection conn =
+                        DriverManager.getConnection(
+                                baseUrl + tablePath.getDatabaseName(), 
username, pwd);
+                PreparedStatement ps =
+                        conn.prepareStatement(
+                                String.format("SELECT * FROM %s;", 
tablePath.getObjectName()))) {
+            DatabaseMetaData metaData = conn.getMetaData();
+            Optional<UniqueConstraint> primaryKey = getPrimaryKey(metaData, 
null, tablePath);
+            ResultSetMetaData resultSetMetaData = ps.getMetaData();
+
+            String[] columnsClassnames = new 
String[resultSetMetaData.getColumnCount()];
+            DataType[] types = new 
DataType[resultSetMetaData.getColumnCount()];
+            for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+                columnsClassnames[i - 1] = resultSetMetaData.getColumnName(i);
+                types[i - 1] = fromJDBCType(tablePath, resultSetMetaData, i);
+                if (resultSetMetaData.isNullable(i) == 
ResultSetMetaData.columnNoNulls) {
+                    types[i - 1] = types[i - 1].notNull();
+                }
+            }
+            TableSchema.Builder tableBuilder =
+                    new TableSchema.Builder().fields(columnsClassnames, types);
+            primaryKey.ifPresent(
+                    pk ->
+                            tableBuilder.primaryKey(
+                                    pk.getName(), pk.getColumns().toArray(new 
String[0])));
+            TableSchema tableSchema = tableBuilder.build();
+            Map<String, String> props = new HashMap<>();
+            props.put(CONNECTOR.key(), IDENTIFIER);
+            props.put(URL.key(), baseUrl + tablePath.getDatabaseName());
+            props.put(TABLE_NAME.key(), tablePath.getObjectName());
+            props.put(USERNAME.key(), username);
+            props.put(PASSWORD.key(), pwd);
+            ps.close();
+            return new CatalogTableImpl(tableSchema, props, "");

Review comment:
       Please follow the deprecation notice and use `CatalogTable#of` instead. 
This also removes the usage of the deprecated `TableSchema`.

##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/MySQLCatalog.java
##########
@@ -0,0 +1,499 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.catalog;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.constraints.UniqueConstraint;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.compress.utils.Lists;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static 
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.PASSWORD;
+import static 
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.TABLE_NAME;
+import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.URL;
+import static 
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.USERNAME;
+import static 
org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory.IDENTIFIER;
+import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
+
+/** Catalog for MySQL. */
+public class MySQLCatalog extends AbstractJdbcCatalog {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(MySQLCatalog.class);
+
+    private final String databaseVersion;
+    private final String driverVersion;
+
+    // ============================data types=====================
+
+    public static final String MYSQL_UNKNOWN = "UNKNOWN";
+    public static final String MYSQL_BIT = "BIT";
+
+    // -------------------------number----------------------------
+    public static final String MYSQL_TINYINT = "TINYINT";
+    public static final String MYSQL_TINYINT_UNSIGNED = "TINYINT UNSIGNED";
+    public static final String MYSQL_SMALLINT = "SMALLINT";
+    public static final String MYSQL_SMALLINT_UNSIGNED = "SMALLINT UNSIGNED";
+    public static final String MYSQL_MEDIUMINT = "MEDIUMINT";
+    public static final String MYSQL_MEDIUMINT_UNSIGNED = "MEDIUMINT UNSIGNED";
+    public static final String MYSQL_INT = "INT";
+    public static final String MYSQL_INT_UNSIGNED = "INT UNSIGNED";
+    public static final String MYSQL_INTEGER = "INTEGER";
+    public static final String MYSQL_INTEGER_UNSIGNED = "INTEGER UNSIGNED";
+    public static final String MYSQL_BIGINT = "BIGINT";
+    public static final String MYSQL_BIGINT_UNSIGNED = "BIGINT UNSIGNED";
+    public static final String MYSQL_DECIMAL = "DECIMAL";
+    public static final String MYSQL_DECIMAL_UNSIGNED = "DECIMAL UNSIGNED";
+    public static final String MYSQL_FLOAT = "FLOAT";
+    public static final String MYSQL_FLOAT_UNSIGNED = "FLOAT UNSIGNED";
+    public static final String MYSQL_DOUBLE = "DOUBLE";
+    public static final String MYSQL_DOUBLE_UNSIGNED = "DOUBLE UNSIGNED";
+
+    // -------------------------string----------------------------
+    public static final String MYSQL_CHAR = "CHAR";
+    public static final String MYSQL_VARCHAR = "VARCHAR";
+    public static final String MYSQL_TINYTEXT = "TINYTEXT";
+    public static final String MYSQL_MEDIUMTEXT = "MEDIUMTEXT";
+    public static final String MYSQL_TEXT = "TEXT";
+    public static final String MYSQL_LONGTEXT = "LONGTEXT";
+    public static final String MYSQL_JSON = "JSON";
+
+    // ------------------------------time-------------------------
+    public static final String MYSQL_DATE = "DATE";
+    public static final String MYSQL_DATETIME = "DATETIME";
+    public static final String MYSQL_TIME = "TIME";
+    public static final String MYSQL_TIMESTAMP = "TIMESTAMP";
+    public static final String MYSQL_YEAR = "YEAR";
+
+    // ------------------------------blob-------------------------
+    public static final String MYSQL_TINYBLOB = "TINYBLOB";
+    public static final String MYSQL_MEDIUMBLOB = "MEDIUMBLOB";
+    public static final String MYSQL_BLOB = "BLOB";
+    public static final String MYSQL_LONGBLOB = "LONGBLOB";
+    public static final String MYSQL_BINARY = "BINARY";
+    public static final String MYSQL_VARBINARY = "VARBINARY";
+    public static final String MYSQL_GEOMETRY = "GEOMETRY";
+
+    // column class names
+    public static final String COLUMN_CLASS_BOOLEAN = "java.lang.Boolean";
+    public static final String COLUMN_CLASS_INTEGER = "java.lang.Integer";
+    public static final String COLUMN_CLASS_BIG_INTEGER = 
"java.math.BigInteger";
+    public static final String COLUMN_CLASS_LONG = "java.lang.Long";
+    public static final String COLUMN_CLASS_FLOAT = "java.lang.Float";
+    public static final String COLUMN_CLASS_DOUBLE = "java.lang.Double";
+    public static final String COLUMN_CLASS_BIG_DECIMAL = 
"java.math.BigDecimal";
+    public static final String COLUMN_CLASS_BYTE_ARRAY = "[B";
+    public static final String COLUMN_CLASS_STRING = "java.lang.String";
+    public static final String COLUMN_CLASS_DATE = "java.sql.Date";
+    public static final String COLUMN_CLASS_TIME = "java.sql.Time";
+    public static final String COLUMN_CLASS_TIMESTAMP = "java.sql.Timestamp";
+
+    public static final int RAW_TIME_LENGTH = 10;
+    public static final int RAW_TIMESTAMP_LENGTH = 19;
+
+    private static final Set<String> builtinDatabases =
+            new HashSet<String>() {
+                {
+                    add("information_schema");
+                    add("mysql");
+                    add("performance_schema");
+                    add("sys");
+                }
+            };
+
+    public MySQLCatalog(
+            String catalogName,
+            String defaultDatabase,
+            String username,
+            String pwd,
+            String baseUrl) {
+        super(catalogName, defaultDatabase, username, pwd, baseUrl);
+        this.driverVersion =
+                Preconditions.checkNotNull(getDriverVersion(), "driver version 
must not be null.");
+        this.databaseVersion =
+                Preconditions.checkNotNull(
+                        getDatabaseVersion(), "database version must not be 
null.");
+        LOG.info("Driver version: {}, database version: {}", driverVersion, 
databaseVersion);
+    }
+
+    @Override
+    public List<String> listDatabases() throws CatalogException {
+        String sql = "SELECT `SCHEMA_NAME` FROM 
`INFORMATION_SCHEMA`.`SCHEMATA`;";
+        return extractColumnValuesBySQL(
+                defaultUrl,
+                sql,
+                1,
+                (FilterFunction<String>) dbName -> 
!builtinDatabases.contains(dbName));
+    }
+
+    @Override
+    public CatalogDatabase getDatabase(String databaseName)
+            throws DatabaseNotExistException, CatalogException {
+        Preconditions.checkState(
+                StringUtils.isNotBlank(databaseName), "Database name must not 
be blank.");
+        if (listDatabases().contains(databaseName)) {
+            return new CatalogDatabaseImpl(Collections.emptyMap(), null);
+        } else {
+            throw new DatabaseNotExistException(getName(), databaseName);
+        }
+    }
+
+    @Override
+    public List<String> listTables(String databaseName)
+            throws DatabaseNotExistException, CatalogException {
+        Preconditions.checkState(
+                StringUtils.isNotBlank(databaseName), "Database name must not 
be blank.");
+        if (!databaseExists(databaseName)) {
+            throw new DatabaseNotExistException(getName(), databaseName);
+        }
+        String connUrl = baseUrl + databaseName;
+        String sql =
+                String.format(
+                        "SELECT TABLE_NAME FROM information_schema.`TABLES` 
WHERE TABLE_SCHEMA = '%s'",
+                        databaseName);
+        return extractColumnValuesBySQL(connUrl, sql, 1, null);
+    }
+
+    // ------ retrieve PK constraint ------
+
+    private Optional<UniqueConstraint> getPrimaryKey(
+            DatabaseMetaData metaData, String schema, ObjectPath table) throws 
SQLException {
+
+        // According to the Javadoc of 
java.sql.DatabaseMetaData#getPrimaryKeys,
+        // the returned primary key columns are ordered by COLUMN_NAME, not by 
KEY_SEQ.
+        // We need to sort them based on the KEY_SEQ value.
+        ResultSet rs =
+                metaData.getPrimaryKeys(table.getDatabaseName(), schema, 
table.getObjectName());
+
+        Map<Integer, String> keySeqColumnName = new HashMap<>();
+        String pkName = null;
+        while (rs.next()) {
+            String columnName = rs.getString("COLUMN_NAME");
+            pkName = rs.getString("PK_NAME"); // all the PK_NAME should be the 
same
+            int keySeq = rs.getInt("KEY_SEQ");
+            keySeqColumnName.put(keySeq - 1, columnName); // KEY_SEQ is 
1-based index
+        }
+        List<String> pkFields =
+                Arrays.asList(new String[keySeqColumnName.size()]); // 
initialize size
+        keySeqColumnName.forEach(pkFields::set);
+        if (!pkFields.isEmpty()) {
+            // PK_NAME maybe null according to the javadoc, generate an unique 
name in that case
+            pkName = pkName == null ? "pk_" + String.join("_", pkFields) : 
pkName;
+            return Optional.of(UniqueConstraint.primaryKey(pkName, pkFields));
+        }
+        return Optional.empty();
+    }
+
+    @Override
+    public CatalogBaseTable getTable(ObjectPath tablePath)
+            throws TableNotExistException, CatalogException {
+        if (!tableExists(tablePath)) {
+            throw new TableNotExistException(getName(), tablePath);
+        }
+        try (Connection conn =
+                        DriverManager.getConnection(
+                                baseUrl + tablePath.getDatabaseName(), 
username, pwd);
+                PreparedStatement ps =
+                        conn.prepareStatement(
+                                String.format("SELECT * FROM %s;", 
tablePath.getObjectName()))) {

Review comment:
       Possible SQL injection

##########
File path: docs/content/docs/connectors/table/jdbc.md
##########
@@ -458,6 +464,38 @@ SELECT * FROM mydb.`custom_schema.test_table2`;
 SELECT * FROM `custom_schema.test_table2`;
 ```
 
+### Jdbc Catalog for MySQL
+
+#### MySQL Metaspace Mapping
+
+The databases in a `MySQL` instance are at the same mapping level as the 
databases under the catalog registered with `MySQLCatalog`. A MySQL instance 
can have multiple databases, each database can have multiple tables.

Review comment:
       "MySQL" does not refer to a class, so we don't need to format it as code.
   
   ```suggestion
   The databases in a MySQL instance are at the same mapping level as the 
databases under the catalog registered with `MySQLCatalog`. A MySQL instance 
can have multiple databases, each database can have multiple tables.
   ```

##########
File path: docs/content/docs/connectors/table/jdbc.md
##########
@@ -458,6 +464,38 @@ SELECT * FROM mydb.`custom_schema.test_table2`;
 SELECT * FROM `custom_schema.test_table2`;
 ```
 
+### Jdbc Catalog for MySQL

Review comment:
       ```suggestion
   ### JDBC Catalog for MySQL
   ```

##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/MySQLCatalog.java
##########
@@ -0,0 +1,499 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.catalog;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.constraints.UniqueConstraint;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.compress.utils.Lists;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static 
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.PASSWORD;
+import static 
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.TABLE_NAME;
+import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.URL;
+import static 
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.USERNAME;
+import static 
org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory.IDENTIFIER;
+import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
+
+/** Catalog for MySQL. */
+public class MySQLCatalog extends AbstractJdbcCatalog {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(MySQLCatalog.class);
+
+    private final String databaseVersion;
+    private final String driverVersion;
+
+    // ============================data types=====================
+
+    public static final String MYSQL_UNKNOWN = "UNKNOWN";

Review comment:
       `@Internal`

##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/MySQLCatalog.java
##########
@@ -0,0 +1,499 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.catalog;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.constraints.UniqueConstraint;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.compress.utils.Lists;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static 
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.PASSWORD;
+import static 
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.TABLE_NAME;
+import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.URL;
+import static 
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.USERNAME;
+import static 
org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory.IDENTIFIER;
+import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
+
+/** Catalog for MySQL. */
+public class MySQLCatalog extends AbstractJdbcCatalog {

Review comment:
       Please add an annotation here. PostgresCatalog is `@Internal`, but is 
that correct? Shouldn't catalogs be `@PublicEvolving`?

##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/MySQLCatalog.java
##########
@@ -0,0 +1,499 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.catalog;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.constraints.UniqueConstraint;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.compress.utils.Lists;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static 
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.PASSWORD;
+import static 
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.TABLE_NAME;
+import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.URL;
+import static 
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.USERNAME;
+import static 
org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory.IDENTIFIER;
+import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
+
+/** Catalog for MySQL. */
+public class MySQLCatalog extends AbstractJdbcCatalog {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(MySQLCatalog.class);
+
+    private final String databaseVersion;
+    private final String driverVersion;
+
+    // ============================data types=====================
+
+    public static final String MYSQL_UNKNOWN = "UNKNOWN";
+    public static final String MYSQL_BIT = "BIT";
+
+    // -------------------------number----------------------------
+    public static final String MYSQL_TINYINT = "TINYINT";
+    public static final String MYSQL_TINYINT_UNSIGNED = "TINYINT UNSIGNED";
+    public static final String MYSQL_SMALLINT = "SMALLINT";
+    public static final String MYSQL_SMALLINT_UNSIGNED = "SMALLINT UNSIGNED";
+    public static final String MYSQL_MEDIUMINT = "MEDIUMINT";
+    public static final String MYSQL_MEDIUMINT_UNSIGNED = "MEDIUMINT UNSIGNED";
+    public static final String MYSQL_INT = "INT";
+    public static final String MYSQL_INT_UNSIGNED = "INT UNSIGNED";
+    public static final String MYSQL_INTEGER = "INTEGER";
+    public static final String MYSQL_INTEGER_UNSIGNED = "INTEGER UNSIGNED";
+    public static final String MYSQL_BIGINT = "BIGINT";
+    public static final String MYSQL_BIGINT_UNSIGNED = "BIGINT UNSIGNED";
+    public static final String MYSQL_DECIMAL = "DECIMAL";
+    public static final String MYSQL_DECIMAL_UNSIGNED = "DECIMAL UNSIGNED";
+    public static final String MYSQL_FLOAT = "FLOAT";
+    public static final String MYSQL_FLOAT_UNSIGNED = "FLOAT UNSIGNED";
+    public static final String MYSQL_DOUBLE = "DOUBLE";
+    public static final String MYSQL_DOUBLE_UNSIGNED = "DOUBLE UNSIGNED";
+
+    // -------------------------string----------------------------
+    public static final String MYSQL_CHAR = "CHAR";
+    public static final String MYSQL_VARCHAR = "VARCHAR";
+    public static final String MYSQL_TINYTEXT = "TINYTEXT";
+    public static final String MYSQL_MEDIUMTEXT = "MEDIUMTEXT";
+    public static final String MYSQL_TEXT = "TEXT";
+    public static final String MYSQL_LONGTEXT = "LONGTEXT";
+    public static final String MYSQL_JSON = "JSON";
+
+    // ------------------------------time-------------------------
+    public static final String MYSQL_DATE = "DATE";
+    public static final String MYSQL_DATETIME = "DATETIME";
+    public static final String MYSQL_TIME = "TIME";
+    public static final String MYSQL_TIMESTAMP = "TIMESTAMP";
+    public static final String MYSQL_YEAR = "YEAR";
+
+    // ------------------------------blob-------------------------
+    public static final String MYSQL_TINYBLOB = "TINYBLOB";
+    public static final String MYSQL_MEDIUMBLOB = "MEDIUMBLOB";
+    public static final String MYSQL_BLOB = "BLOB";
+    public static final String MYSQL_LONGBLOB = "LONGBLOB";
+    public static final String MYSQL_BINARY = "BINARY";
+    public static final String MYSQL_VARBINARY = "VARBINARY";
+    public static final String MYSQL_GEOMETRY = "GEOMETRY";
+
+    // column class names
+    public static final String COLUMN_CLASS_BOOLEAN = "java.lang.Boolean";
+    public static final String COLUMN_CLASS_INTEGER = "java.lang.Integer";
+    public static final String COLUMN_CLASS_BIG_INTEGER = 
"java.math.BigInteger";
+    public static final String COLUMN_CLASS_LONG = "java.lang.Long";
+    public static final String COLUMN_CLASS_FLOAT = "java.lang.Float";
+    public static final String COLUMN_CLASS_DOUBLE = "java.lang.Double";
+    public static final String COLUMN_CLASS_BIG_DECIMAL = 
"java.math.BigDecimal";
+    public static final String COLUMN_CLASS_BYTE_ARRAY = "[B";
+    public static final String COLUMN_CLASS_STRING = "java.lang.String";
+    public static final String COLUMN_CLASS_DATE = "java.sql.Date";
+    public static final String COLUMN_CLASS_TIME = "java.sql.Time";
+    public static final String COLUMN_CLASS_TIMESTAMP = "java.sql.Timestamp";
+
+    public static final int RAW_TIME_LENGTH = 10;
+    public static final int RAW_TIMESTAMP_LENGTH = 19;
+
+    private static final Set<String> builtinDatabases =
+            new HashSet<String>() {
+                {
+                    add("information_schema");
+                    add("mysql");
+                    add("performance_schema");
+                    add("sys");
+                }
+            };
+
+    public MySQLCatalog(
+            String catalogName,
+            String defaultDatabase,
+            String username,
+            String pwd,
+            String baseUrl) {
+        super(catalogName, defaultDatabase, username, pwd, baseUrl);
+        this.driverVersion =
+                Preconditions.checkNotNull(getDriverVersion(), "driver version 
must not be null.");
+        this.databaseVersion =
+                Preconditions.checkNotNull(
+                        getDatabaseVersion(), "database version must not be 
null.");
+        LOG.info("Driver version: {}, database version: {}", driverVersion, 
databaseVersion);
+    }
+
+    @Override
+    public List<String> listDatabases() throws CatalogException {
+        String sql = "SELECT `SCHEMA_NAME` FROM 
`INFORMATION_SCHEMA`.`SCHEMATA`;";
+        return extractColumnValuesBySQL(
+                defaultUrl,
+                sql,
+                1,
+                (FilterFunction<String>) dbName -> 
!builtinDatabases.contains(dbName));
+    }
+
+    @Override
+    public CatalogDatabase getDatabase(String databaseName)
+            throws DatabaseNotExistException, CatalogException {
+        Preconditions.checkState(
+                StringUtils.isNotBlank(databaseName), "Database name must not 
be blank.");
+        if (listDatabases().contains(databaseName)) {
+            return new CatalogDatabaseImpl(Collections.emptyMap(), null);
+        } else {
+            throw new DatabaseNotExistException(getName(), databaseName);
+        }
+    }
+
+    @Override
+    public List<String> listTables(String databaseName)
+            throws DatabaseNotExistException, CatalogException {
+        Preconditions.checkState(
+                StringUtils.isNotBlank(databaseName), "Database name must not 
be blank.");
+        if (!databaseExists(databaseName)) {
+            throw new DatabaseNotExistException(getName(), databaseName);
+        }
+        String connUrl = baseUrl + databaseName;
+        String sql =
+                String.format(
+                        "SELECT TABLE_NAME FROM information_schema.`TABLES` 
WHERE TABLE_SCHEMA = '%s'",
+                        databaseName);
+        return extractColumnValuesBySQL(connUrl, sql, 1, null);
+    }
+
+    // ------ retrieve PK constraint ------
+
+    private Optional<UniqueConstraint> getPrimaryKey(
+            DatabaseMetaData metaData, String schema, ObjectPath table) throws 
SQLException {
+
+        // According to the Javadoc of 
java.sql.DatabaseMetaData#getPrimaryKeys,
+        // the returned primary key columns are ordered by COLUMN_NAME, not by 
KEY_SEQ.
+        // We need to sort them based on the KEY_SEQ value.
+        ResultSet rs =
+                metaData.getPrimaryKeys(table.getDatabaseName(), schema, 
table.getObjectName());
+
+        Map<Integer, String> keySeqColumnName = new HashMap<>();
+        String pkName = null;
+        while (rs.next()) {
+            String columnName = rs.getString("COLUMN_NAME");
+            pkName = rs.getString("PK_NAME"); // all the PK_NAME should be the 
same
+            int keySeq = rs.getInt("KEY_SEQ");
+            keySeqColumnName.put(keySeq - 1, columnName); // KEY_SEQ is 
1-based index
+        }
+        List<String> pkFields =
+                Arrays.asList(new String[keySeqColumnName.size()]); // 
initialize size
+        keySeqColumnName.forEach(pkFields::set);
+        if (!pkFields.isEmpty()) {
+            // PK_NAME maybe null according to the javadoc, generate an unique 
name in that case
+            pkName = pkName == null ? "pk_" + String.join("_", pkFields) : 
pkName;
+            return Optional.of(UniqueConstraint.primaryKey(pkName, pkFields));
+        }
+        return Optional.empty();
+    }
+
+    @Override
+    public CatalogBaseTable getTable(ObjectPath tablePath)
+            throws TableNotExistException, CatalogException {
+        if (!tableExists(tablePath)) {
+            throw new TableNotExistException(getName(), tablePath);
+        }
+        try (Connection conn =
+                        DriverManager.getConnection(
+                                baseUrl + tablePath.getDatabaseName(), 
username, pwd);
+                PreparedStatement ps =
+                        conn.prepareStatement(
+                                String.format("SELECT * FROM %s;", 
tablePath.getObjectName()))) {
+            DatabaseMetaData metaData = conn.getMetaData();
+            Optional<UniqueConstraint> primaryKey = getPrimaryKey(metaData, 
null, tablePath);
+            ResultSetMetaData resultSetMetaData = ps.getMetaData();
+
+            String[] columnsClassnames = new 
String[resultSetMetaData.getColumnCount()];
+            DataType[] types = new 
DataType[resultSetMetaData.getColumnCount()];
+            for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+                columnsClassnames[i - 1] = resultSetMetaData.getColumnName(i);
+                types[i - 1] = fromJDBCType(tablePath, resultSetMetaData, i);
+                if (resultSetMetaData.isNullable(i) == 
ResultSetMetaData.columnNoNulls) {
+                    types[i - 1] = types[i - 1].notNull();
+                }
+            }
+            TableSchema.Builder tableBuilder =
+                    new TableSchema.Builder().fields(columnsClassnames, types);
+            primaryKey.ifPresent(
+                    pk ->
+                            tableBuilder.primaryKey(
+                                    pk.getName(), pk.getColumns().toArray(new 
String[0])));
+            TableSchema tableSchema = tableBuilder.build();
+            Map<String, String> props = new HashMap<>();
+            props.put(CONNECTOR.key(), IDENTIFIER);
+            props.put(URL.key(), baseUrl + tablePath.getDatabaseName());
+            props.put(TABLE_NAME.key(), tablePath.getObjectName());
+            props.put(USERNAME.key(), username);
+            props.put(PASSWORD.key(), pwd);
+            ps.close();
+            return new CatalogTableImpl(tableSchema, props, "");
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format("Failed getting table %s", 
tablePath.getFullName()), e);
+        }
+    }
+
+    @Override
+    public boolean tableExists(ObjectPath tablePath) throws CatalogException {
+        try {
+            return 
listTables(tablePath.getDatabaseName()).contains(tablePath.getObjectName());

Review comment:
       This will look up all tables to test existence. We can implement this 
better by querying directly whether this table exists.

##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/MySQLCatalog.java
##########
@@ -0,0 +1,499 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.catalog;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.constraints.UniqueConstraint;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.compress.utils.Lists;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static 
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.PASSWORD;
+import static 
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.TABLE_NAME;
+import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.URL;
+import static 
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.USERNAME;
+import static 
org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory.IDENTIFIER;
+import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
+
+/** Catalog for MySQL. */
+public class MySQLCatalog extends AbstractJdbcCatalog {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(MySQLCatalog.class);
+
+    private final String databaseVersion;
+    private final String driverVersion;
+
+    // ============================data types=====================
+
+    public static final String MYSQL_UNKNOWN = "UNKNOWN";
+    public static final String MYSQL_BIT = "BIT";
+
+    // -------------------------number----------------------------
+    public static final String MYSQL_TINYINT = "TINYINT";
+    public static final String MYSQL_TINYINT_UNSIGNED = "TINYINT UNSIGNED";
+    public static final String MYSQL_SMALLINT = "SMALLINT";
+    public static final String MYSQL_SMALLINT_UNSIGNED = "SMALLINT UNSIGNED";
+    public static final String MYSQL_MEDIUMINT = "MEDIUMINT";
+    public static final String MYSQL_MEDIUMINT_UNSIGNED = "MEDIUMINT UNSIGNED";
+    public static final String MYSQL_INT = "INT";
+    public static final String MYSQL_INT_UNSIGNED = "INT UNSIGNED";
+    public static final String MYSQL_INTEGER = "INTEGER";
+    public static final String MYSQL_INTEGER_UNSIGNED = "INTEGER UNSIGNED";
+    public static final String MYSQL_BIGINT = "BIGINT";
+    public static final String MYSQL_BIGINT_UNSIGNED = "BIGINT UNSIGNED";
+    public static final String MYSQL_DECIMAL = "DECIMAL";
+    public static final String MYSQL_DECIMAL_UNSIGNED = "DECIMAL UNSIGNED";
+    public static final String MYSQL_FLOAT = "FLOAT";
+    public static final String MYSQL_FLOAT_UNSIGNED = "FLOAT UNSIGNED";
+    public static final String MYSQL_DOUBLE = "DOUBLE";
+    public static final String MYSQL_DOUBLE_UNSIGNED = "DOUBLE UNSIGNED";
+
+    // -------------------------string----------------------------
+    public static final String MYSQL_CHAR = "CHAR";
+    public static final String MYSQL_VARCHAR = "VARCHAR";
+    public static final String MYSQL_TINYTEXT = "TINYTEXT";
+    public static final String MYSQL_MEDIUMTEXT = "MEDIUMTEXT";
+    public static final String MYSQL_TEXT = "TEXT";
+    public static final String MYSQL_LONGTEXT = "LONGTEXT";
+    public static final String MYSQL_JSON = "JSON";
+
+    // ------------------------------time-------------------------
+    public static final String MYSQL_DATE = "DATE";
+    public static final String MYSQL_DATETIME = "DATETIME";
+    public static final String MYSQL_TIME = "TIME";
+    public static final String MYSQL_TIMESTAMP = "TIMESTAMP";
+    public static final String MYSQL_YEAR = "YEAR";
+
+    // ------------------------------blob-------------------------
+    public static final String MYSQL_TINYBLOB = "TINYBLOB";
+    public static final String MYSQL_MEDIUMBLOB = "MEDIUMBLOB";
+    public static final String MYSQL_BLOB = "BLOB";
+    public static final String MYSQL_LONGBLOB = "LONGBLOB";
+    public static final String MYSQL_BINARY = "BINARY";
+    public static final String MYSQL_VARBINARY = "VARBINARY";
+    public static final String MYSQL_GEOMETRY = "GEOMETRY";
+
+    // column class names
+    public static final String COLUMN_CLASS_BOOLEAN = "java.lang.Boolean";
+    public static final String COLUMN_CLASS_INTEGER = "java.lang.Integer";
+    public static final String COLUMN_CLASS_BIG_INTEGER = 
"java.math.BigInteger";
+    public static final String COLUMN_CLASS_LONG = "java.lang.Long";
+    public static final String COLUMN_CLASS_FLOAT = "java.lang.Float";
+    public static final String COLUMN_CLASS_DOUBLE = "java.lang.Double";
+    public static final String COLUMN_CLASS_BIG_DECIMAL = 
"java.math.BigDecimal";
+    public static final String COLUMN_CLASS_BYTE_ARRAY = "[B";
+    public static final String COLUMN_CLASS_STRING = "java.lang.String";
+    public static final String COLUMN_CLASS_DATE = "java.sql.Date";
+    public static final String COLUMN_CLASS_TIME = "java.sql.Time";
+    public static final String COLUMN_CLASS_TIMESTAMP = "java.sql.Timestamp";
+
+    public static final int RAW_TIME_LENGTH = 10;
+    public static final int RAW_TIMESTAMP_LENGTH = 19;
+
+    private static final Set<String> builtinDatabases =
+            new HashSet<String>() {
+                {
+                    add("information_schema");
+                    add("mysql");
+                    add("performance_schema");
+                    add("sys");
+                }
+            };
+
+    public MySQLCatalog(
+            String catalogName,
+            String defaultDatabase,
+            String username,
+            String pwd,
+            String baseUrl) {
+        super(catalogName, defaultDatabase, username, pwd, baseUrl);
+        this.driverVersion =
+                Preconditions.checkNotNull(getDriverVersion(), "driver version 
must not be null.");
+        this.databaseVersion =
+                Preconditions.checkNotNull(
+                        getDatabaseVersion(), "database version must not be 
null.");
+        LOG.info("Driver version: {}, database version: {}", driverVersion, 
databaseVersion);
+    }
+
+    @Override
+    public List<String> listDatabases() throws CatalogException {
+        String sql = "SELECT `SCHEMA_NAME` FROM 
`INFORMATION_SCHEMA`.`SCHEMATA`;";
+        return extractColumnValuesBySQL(
+                defaultUrl,
+                sql,
+                1,
+                (FilterFunction<String>) dbName -> 
!builtinDatabases.contains(dbName));
+    }
+
+    @Override
+    public CatalogDatabase getDatabase(String databaseName)
+            throws DatabaseNotExistException, CatalogException {
+        Preconditions.checkState(
+                StringUtils.isNotBlank(databaseName), "Database name must not 
be blank.");
+        if (listDatabases().contains(databaseName)) {
+            return new CatalogDatabaseImpl(Collections.emptyMap(), null);
+        } else {
+            throw new DatabaseNotExistException(getName(), databaseName);
+        }
+    }
+
+    @Override
+    public List<String> listTables(String databaseName)
+            throws DatabaseNotExistException, CatalogException {
+        Preconditions.checkState(
+                StringUtils.isNotBlank(databaseName), "Database name must not 
be blank.");
+        if (!databaseExists(databaseName)) {
+            throw new DatabaseNotExistException(getName(), databaseName);
+        }
+        String connUrl = baseUrl + databaseName;
+        String sql =
+                String.format(
+                        "SELECT TABLE_NAME FROM information_schema.`TABLES` 
WHERE TABLE_SCHEMA = '%s'",

Review comment:
       This would allow a SQL injection. Please use prepared statements.

##########
File path: docs/content/docs/connectors/table/jdbc.md
##########
@@ -308,47 +308,51 @@ As there is no standard syntax for upsert, the following 
table describes the dat
     </tbody>
 </table>
 
-### Postgres Database as a Catalog
+JDBC Catalog
+------------
 
 The `JdbcCatalog` enables users to connect Flink to relational databases over 
JDBC protocol.
 
-Currently, `PostgresCatalog` is the only implementation of JDBC Catalog at the 
moment, `PostgresCatalog` only supports limited `Catalog` methods include:
+Currently, `PostgresCatalog` and `MySQLCatalog` are the two implementations of 
JDBC Catalog at the moment, `JdbcCatalog` only supports limited `Catalog` 
methods include:
 
 ```java
-// The supported methods by Postgres Catalog.
-PostgresCatalog.databaseExists(String databaseName)
-PostgresCatalog.listDatabases()
-PostgresCatalog.getDatabase(String databaseName)
-PostgresCatalog.listTables(String databaseName)
-PostgresCatalog.getTable(ObjectPath tablePath)
-PostgresCatalog.tableExists(ObjectPath tablePath)
+// The supported methods by Postgres & MySQL Catalog.
+databaseExists(String databaseName);
+listDatabases();
+getDatabase(String databaseName);
+listTables(String databaseName);
+getTable(ObjectPath tablePath);
+tableExists(ObjectPath tablePath);
 ```
 
 Other `Catalog` methods is unsupported now.
 
-#### Usage of PostgresCatalog
+### Usage of JDBC Catalog
 
-Please refer to [Dependencies](#dependencies) section for how to setup a JDBC 
connector and Postgres driver.
+The section mainly describes how to create and use a `PostgresCatalog` or 
`MySQLCatalog`.
+Please refer to [Dependencies](#dependencies) section for how to setup a JDBC 
connector and Postgres/MySQL driver.
 
-Postgres catalog supports the following options:
+Postgres/MySQL catalog supports the following options:

Review comment:
       ```suggestion
   The JDBC catalog supports the following options:
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to