This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/api-draft by this push:
new 94fe1e081 [api-draft][catalog] jdbc catalog (#2042)
94fe1e081 is described below
commit 94fe1e081cf48e6095a860d431795675e871b651
Author: Zongwen Li <[email protected]>
AuthorDate: Mon Jun 27 13:58:16 2022 +0800
[api-draft][catalog] jdbc catalog (#2042)
* [api-draft][catalog] jdbc catalog
# Conflicts:
# seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
* [api-draft][catalog] Add AbstractJdbcCatalog with defaultUrl constructor
Co-authored-by: Hisoka <[email protected]>
---
pom.xml | 20 +-
.../seatunnel/api/table/catalog/Catalog.java | 91 ++++++++
.../seatunnel/api/table/catalog/TableSchema.java | 94 ++++++++-
.../exception/DatabaseNotExistException.java | 32 +++
.../catalog/exception/TableNotExistException.java | 35 ++++
seatunnel-common/pom.xml | 5 +
.../seatunnel-connectors-seatunnel/pom.xml | 11 +
.../seatunnel-connector-seatunnel-jdbc/pom.xml | 1 +
.../jdbc/catalog/AbstractJdbcCatalog.java | 211 +++++++++++++++++++
.../seatunnel/jdbc/catalog/MySqlCatalog.java | 229 +++++++++++++++++++++
.../jdbc/catalog/AbstractJdbcCatalogTest.java | 48 +++++
11 files changed, 769 insertions(+), 8 deletions(-)
diff --git a/pom.xml b/pom.xml
index 504b8cc48..fcf707a46 100644
--- a/pom.xml
+++ b/pom.xml
@@ -153,6 +153,7 @@
<docker.tag>${project.version}</docker.tag>
<jcommander.version>1.81</jcommander.version>
<junit.version>4.13.2</junit.version>
+ <junit5.version>5.8.2</junit5.version>
<tispark.version>2.4.1</tispark.version>
<druid.version>0.22.1</druid.version>
<sshd.version>2.7.0</sshd.version>
@@ -514,6 +515,20 @@
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-engine</artifactId>
+ <version>${junit5.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-params</artifactId>
+ <version>${junit5.version}</version>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
@@ -665,11 +680,6 @@
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>test</scope>
- </dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java
index 783304608..e3f83d9ff 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java
@@ -17,9 +17,100 @@
package org.apache.seatunnel.api.table.catalog;
+import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
+import
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
+import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
+import org.apache.seatunnel.api.table.factory.Factory;
+
+import java.util.List;
+import java.util.Optional;
+
/**
* Interface for reading and writing table metadata from SeaTunnel. Each
connector need to contain
* the implementation of Catalog.
*/
public interface Catalog {
+
+ default Optional<Factory> getFactory() {
+ return Optional.empty();
+ }
+
+ /**
+ * Open the catalog. Used for any required preparation in initialization
phase.
+ *
+ * @throws CatalogException in case of any runtime exception
+ */
+ void open() throws CatalogException;
+
+ /**
+ * Close the catalog when it is no longer needed and release any resource
that it might be
+ * holding.
+ *
+ * @throws CatalogException in case of any runtime exception
+ */
+ void close() throws CatalogException;
+
+ //
--------------------------------------------------------------------------------------------
+ // database
+ //
--------------------------------------------------------------------------------------------
+
+ /**
+ * Get the name of the default database for this catalog. The default
database will be the
+ * current database for the catalog when user's session doesn't specify a
current database. The
+ * value probably comes from configuration, will not change for the life
time of the catalog
+ * instance.
+ *
+ * @return the name of the current database
+ * @throws CatalogException in case of any runtime exception
+ */
+ String getDefaultDatabase() throws CatalogException;
+
+ /**
+ * Check if a database exists in this catalog.
+ *
+ * @param databaseName Name of the database
+ * @return true if the given database exists in the catalog false otherwise
+ * @throws CatalogException in case of any runtime exception
+ */
+ boolean databaseExists(String databaseName) throws CatalogException;
+
+ /**
+ * Get the names of all databases in this catalog.
+ *
+ * @return a list of the names of all databases
+ * @throws CatalogException in case of any runtime exception
+ */
+ List<String> listDatabases() throws CatalogException;
+
+ //
--------------------------------------------------------------------------------------------
+ // table
+ //
--------------------------------------------------------------------------------------------
+
+ /**
+ * Get names of all tables under this database. An empty list is returned
if none
+ * exists.
+ *
+ * @return a list of the names of all tables in this database
+ * @throws CatalogException in case of any runtime exception
+ */
+ List<String> listTables(String databaseName) throws CatalogException,
DatabaseNotExistException;
+
+ /**
+ * Check if a table exist in this catalog.
+ *
+ * @param tablePath Path of the table
+ * @return true if the given table exists in the catalog false otherwise
+ * @throws CatalogException in case of any runtime exception
+ */
+ boolean tableExists(TablePath tablePath) throws CatalogException;
+
+ /**
+ * Return a {@link CatalogTable} identified by the given {@link
+ * TablePath}. The framework will resolve the metadata objects when
necessary.
+ *
+ * @param tablePath Path of the table
+ * @return The requested table
+ * @throws CatalogException in case of any runtime exception
+ */
+ CatalogTable getTable(TablePath tablePath) throws CatalogException,
TableNotExistException;
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java
index 04ca51698..61d99ef52 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java
@@ -17,8 +17,12 @@
package org.apache.seatunnel.api.table.catalog;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+
import java.io.Serializable;
+import java.util.ArrayList;
import java.util.List;
+import java.util.Objects;
/**
* Represent a physical table schema.
@@ -27,12 +31,15 @@ public final class TableSchema implements Serializable {
private static final long serialVersionUID = 1L;
private final List<Column> columns;
- private TableSchema(List<Column> columns) {
+ private final PrimaryKey primaryKey;
+
+ private TableSchema(List<Column> columns, PrimaryKey primaryKey) {
this.columns = columns;
+ this.primaryKey = primaryKey;
}
- public static TableSchema of(List<Column> columns) {
- return new TableSchema(columns);
+ public static TableSchema.Builder builder() {
+ return new Builder();
}
/**
@@ -41,4 +48,85 @@ public final class TableSchema implements Serializable {
public List<Column> getColumns() {
return columns;
}
+
+ public static final class Builder {
+ private final List<Column> columns = new ArrayList<>();
+
+ private PrimaryKey primaryKey;
+
+ public Builder columns(List<Column> columns) {
+ this.columns.addAll(columns);
+ return this;
+ }
+
+ public Builder column(Column column) {
+ this.columns.add(column);
+ return this;
+ }
+
+ public Builder physicalColumn(String name, SeaTunnelDataType<?>
dataType) {
+ this.columns.add(Column.physical(name, dataType));
+ return this;
+ }
+
+ public Builder primaryKey(PrimaryKey primaryKey) {
+ this.primaryKey = primaryKey;
+ return this;
+ }
+
+ public Builder primaryKey(String constraintName, List<String>
columnNames) {
+ this.primaryKey = PrimaryKey.of(constraintName, columnNames);
+ return this;
+ }
+
+ public TableSchema build() {
+ return new TableSchema(columns, primaryKey);
+ }
+ }
+
+ public static final class PrimaryKey implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final String constraintName;
+ private final List<String> columnNames;
+
+ private PrimaryKey(String constraintName, List<String> columnNames) {
+ this.constraintName = constraintName;
+ this.columnNames = columnNames;
+ }
+
+ public static PrimaryKey of(String constraintName, List<String>
columnNames) {
+ return new PrimaryKey(constraintName, columnNames);
+ }
+
+ public String getConstraintName() {
+ return constraintName;
+ }
+
+ public List<String> getColumnNames() {
+ return columnNames;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("CONSTRAINT %s PRIMARY KEY (%s) NOT
ENFORCED", constraintName, String.join(", ", columnNames));
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ PrimaryKey that = (PrimaryKey) o;
+ return constraintName.equals(that.constraintName) &&
columnNames.equals(that.columnNames);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(constraintName, columnNames);
+ }
+ }
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/exception/DatabaseNotExistException.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/exception/DatabaseNotExistException.java
new file mode 100644
index 000000000..5e81505f7
--- /dev/null
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/exception/DatabaseNotExistException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.catalog.exception;
+
+/** Exception for trying to operate on a database that doesn't exist. */
+public class DatabaseNotExistException extends Exception {
+ private static final String MSG = "Database %s does not exist in Catalog
%s.";
+
+ public DatabaseNotExistException(String catalogName, String databaseName,
Throwable cause) {
+ super(String.format(MSG, databaseName, catalogName), cause);
+ }
+
+ public DatabaseNotExistException(String catalogName, String databaseName) {
+ this(catalogName, databaseName, null);
+ }
+}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/exception/TableNotExistException.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/exception/TableNotExistException.java
new file mode 100644
index 000000000..d37a586eb
--- /dev/null
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/exception/TableNotExistException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.catalog.exception;
+
+import org.apache.seatunnel.api.table.catalog.TablePath;
+
+/** Exception for trying to operate on a table that doesn't exist. */
+public class TableNotExistException extends Exception {
+
+ private static final String MSG = "Table %s does not exist in Catalog %s.";
+
+ public TableNotExistException(String catalogName, TablePath tablePath) {
+ this(catalogName, tablePath, null);
+ }
+
+ public TableNotExistException(String catalogName, TablePath tablePath,
Throwable cause) {
+ super(String.format(MSG, tablePath.getFullName(), catalogName), cause);
+ }
+}
diff --git a/seatunnel-common/pom.xml b/seatunnel-common/pom.xml
index bf801a04a..aecac47f5 100644
--- a/seatunnel-common/pom.xml
+++ b/seatunnel-common/pom.xml
@@ -55,6 +55,11 @@
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
b/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
index 6b94005e3..4c28aa789 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
@@ -43,4 +43,15 @@
<module>seatunnel-connector-seatunnel-hive</module>
</modules>
+ <dependencies>
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-engine</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-params</artifactId>
+ </dependency>
+ </dependencies>
+
</project>
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/pom.xml
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/pom.xml
index d5c9ad279..28dd4cb5f 100644
---
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/pom.xml
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/pom.xml
@@ -40,6 +40,7 @@
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
+ <scope>provided</scope>
</dependency>
<dependency>
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
new file mode 100644
index 000000000..852fbbe0a
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
+import
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+public abstract class AbstractJdbcCatalog implements Catalog {
+ private static final Logger LOG =
LoggerFactory.getLogger(AbstractJdbcCatalog.class);
+
+ protected final String catalogName;
+ protected final String defaultDatabase;
+ protected final String username;
+ protected final String pwd;
+ protected final String baseUrl;
+ protected final String defaultUrl;
+
+ public AbstractJdbcCatalog(
+ String catalogName,
+ String defaultDatabase,
+ String username,
+ String pwd,
+ String baseUrl) {
+
+ checkArgument(StringUtils.isNotBlank(username));
+ checkArgument(StringUtils.isNotBlank(pwd));
+ checkArgument(StringUtils.isNotBlank(baseUrl));
+
+ baseUrl = baseUrl.trim();
+ validateJdbcUrlWithoutDatabase(baseUrl);
+ this.catalogName = catalogName;
+ this.defaultDatabase = defaultDatabase;
+ this.username = username;
+ this.pwd = pwd;
+ this.baseUrl = baseUrl.endsWith("/") ? baseUrl : baseUrl + "/";
+ this.defaultUrl = this.baseUrl + defaultDatabase;
+ }
+
+ /**
+ * URL has to be without database, like "jdbc:mysql://localhost:5432/" or
+ * "jdbc:mysql://localhost:5432" rather than
"jdbc:mysql://localhost:5432/db".
+ */
+ public static void validateJdbcUrlWithoutDatabase(String url) {
+ String[] parts = url.trim().split("\\/+");
+
+ checkArgument(parts.length == 2);
+ }
+
+ public AbstractJdbcCatalog(
+ String catalogName,
+ String username,
+ String pwd,
+ String defaultUrl) {
+
+ checkArgument(StringUtils.isNotBlank(username));
+ checkArgument(StringUtils.isNotBlank(pwd));
+ checkArgument(StringUtils.isNotBlank(defaultUrl));
+
+ defaultUrl = defaultUrl.trim();
+ validateJdbcUrlWithDatabase(defaultUrl);
+ this.catalogName = catalogName;
+ this.username = username;
+ this.pwd = pwd;
+ this.defaultUrl = defaultUrl;
+ String[] strings = splitDefaultUrl(defaultUrl);
+ this.baseUrl = strings[0];
+ this.defaultDatabase = strings[1];
+ }
+
+ /**
+ * URL has to be with database, like "jdbc:mysql://localhost:5432/db"
rather than "jdbc:mysql://localhost:5432/".
+ */
+ @SuppressWarnings("MagicNumber")
+ public static void validateJdbcUrlWithDatabase(String url) {
+ String[] parts = url.trim().split("\\/+");
+ checkArgument(parts.length == 3);
+ }
+
+ /**
+ * Ensure that the url was validated {@link #validateJdbcUrlWithDatabase}.
+ *
+ * @return The array size is fixed at 2, index 0 is base url, and index 1
is default database.
+ */
+ public static String[] splitDefaultUrl(String defaultUrl) {
+ String[] res = new String[2];
+ int index = defaultUrl.lastIndexOf("/") + 1;
+ res[0] = defaultUrl.substring(0, index);
+ res[1] = defaultUrl.substring(index, defaultUrl.length());
+ return res;
+ }
+
+ @Override
+ public String getDefaultDatabase() {
+ return defaultDatabase;
+ }
+
+ public String getCatalogName() {
+ return catalogName;
+ }
+
+ public String getUsername() {
+ return username;
+ }
+
+ public String getPassword() {
+ return pwd;
+ }
+
+ public String getBaseUrl() {
+ return baseUrl;
+ }
+
+ @Override
+ public void open() throws CatalogException {
+ try (Connection conn = DriverManager.getConnection(defaultUrl,
username, pwd)) {
+ // test connection, fail early if we cannot connect to database
+ } catch (SQLException e) {
+ throw new CatalogException(
+ String.format("Failed connecting to %s via JDBC.",
defaultUrl), e);
+ }
+
+ LOG.info("Catalog {} established connection to {}", catalogName,
defaultUrl);
+ }
+
+ @Override
+ public void close() throws CatalogException {
+ LOG.info("Catalog {} closing", catalogName);
+ }
+
+ protected Optional<TableSchema.PrimaryKey> getPrimaryKey(
+ DatabaseMetaData metaData, String schema, String table) throws
SQLException {
+
+ // According to the Javadoc of
java.sql.DatabaseMetaData#getPrimaryKeys,
+ // the returned primary key columns are ordered by COLUMN_NAME, not by
KEY_SEQ.
+ // We need to sort them based on the KEY_SEQ value.
+ ResultSet rs = metaData.getPrimaryKeys(null, schema, table);
+
+ Map<Integer, String> keySeqColumnName = new HashMap<>();
+ String pkName = null;
+ while (rs.next()) {
+ String columnName = rs.getString("COLUMN_NAME");
+ // all the PK_NAME should be the same
+ pkName = rs.getString("PK_NAME");
+ int keySeq = rs.getInt("KEY_SEQ");
+ // KEY_SEQ is 1-based index
+ keySeqColumnName.put(keySeq - 1, columnName);
+ }
+ // initialize size
+ List<String> pkFields = Arrays.asList(new
String[keySeqColumnName.size()]);
+ keySeqColumnName.forEach(pkFields::set);
+ if (!pkFields.isEmpty()) {
+ // PK_NAME maybe null according to the javadoc, generate an unique
name in that case
+ pkName = pkName == null ? "pk_" + String.join("_", pkFields) :
pkName;
+ return Optional.of(TableSchema.PrimaryKey.of(pkName, pkFields));
+ }
+ return Optional.empty();
+ }
+
+ @Override
+ public boolean databaseExists(String databaseName) throws CatalogException
{
+ checkArgument(StringUtils.isNotBlank(databaseName));
+
+ return listDatabases().contains(databaseName);
+ }
+
+ @Override
+ public boolean tableExists(TablePath tablePath) throws CatalogException {
+ try {
+ return databaseExists(tablePath.getDatabaseName())
+ &&
listTables(tablePath.getDatabaseName()).contains(tablePath.getTableName());
+ } catch (DatabaseNotExistException e) {
+ return false;
+ }
+ }
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MySqlCatalog.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MySqlCatalog.java
new file mode 100644
index 000000000..69638d042
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MySqlCatalog.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
+import
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
+import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.PrimitiveArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+
+import com.mysql.cj.MysqlType;
+import com.mysql.cj.jdbc.result.ResultSetImpl;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+public class MySqlCatalog extends AbstractJdbcCatalog {
+
+ private static final Set<String> SYS_DATABASES = new HashSet<>(4);
+
+ static {
+ SYS_DATABASES.add("information_schema");
+ SYS_DATABASES.add("mysql");
+ SYS_DATABASES.add("performance_schema");
+ SYS_DATABASES.add("sys");
+ }
+
+ public MySqlCatalog(String catalogName, String defaultDatabase, String
username, String pwd, String baseUrl) {
+ super(catalogName, defaultDatabase, username, pwd, baseUrl);
+ }
+
+ public MySqlCatalog(String catalogName, String username, String pwd,
String defaultUrl) {
+ super(catalogName, username, pwd, defaultUrl);
+ }
+
+ @Override
+ public List<String> listDatabases() throws CatalogException {
+ try (Connection conn = DriverManager.getConnection(defaultUrl,
username, pwd)) {
+
+ PreparedStatement ps = conn.prepareStatement("SHOW DATABASES;");
+
+ List<String> databases = new ArrayList<>();
+ ResultSet rs = ps.executeQuery();
+
+ while (rs.next()) {
+ String databaseName = rs.getString(1);
+ if (!SYS_DATABASES.contains(databaseName)) {
+ databases.add(rs.getString(1));
+ }
+ }
+
+ return databases;
+ } catch (Exception e) {
+ throw new CatalogException(
+ String.format("Failed listing database in catalog %s",
this.catalogName), e);
+ }
+ }
+
+ @Override
+ public List<String> listTables(String databaseName) throws
CatalogException, DatabaseNotExistException {
+ if (!databaseExists(databaseName)) {
+ throw new DatabaseNotExistException(this.catalogName,
databaseName);
+ }
+
+ try (Connection conn = DriverManager.getConnection(baseUrl +
databaseName, username, pwd)) {
+ PreparedStatement ps =
+ conn.prepareStatement("SHOW TABLES;");
+
+ ResultSet rs = ps.executeQuery();
+
+ List<String> tables = new ArrayList<>();
+
+ while (rs.next()) {
+ tables.add(rs.getString(1));
+ }
+
+ return tables;
+ } catch (Exception e) {
+ throw new CatalogException(
+ String.format("Failed listing database in catalog %s",
catalogName), e);
+ }
+ }
+
+ @Override
+ public CatalogTable getTable(TablePath tablePath) throws CatalogException,
TableNotExistException {
+ if (!tableExists(tablePath)) {
+ throw new TableNotExistException(catalogName, tablePath);
+ }
+
+ String dbUrl = baseUrl + tablePath.getDatabaseName();
+ try (Connection conn = DriverManager.getConnection(dbUrl, username,
pwd)) {
+ DatabaseMetaData metaData = conn.getMetaData();
+ Optional<TableSchema.PrimaryKey> primaryKey =
+ getPrimaryKey(metaData, tablePath.getDatabaseName(),
tablePath.getTableName());
+
+ PreparedStatement ps =
+ conn.prepareStatement(String.format("SELECT * FROM %s LIMIT
1;", tablePath.getFullName()));
+
+ ResultSetMetaData tableMetaData = ps.getMetaData();
+
+ TableSchema.Builder builder = TableSchema.builder();
+ for (int i = 1; i <= tableMetaData.getColumnCount(); i++) {
+ SeaTunnelDataType<?> type = fromJdbcType(tableMetaData, i);
+ builder.physicalColumn(tableMetaData.getColumnName(i), type);
+ }
+
+ primaryKey.ifPresent(builder::primaryKey);
+
+ TableIdentifier tableIdentifier = TableIdentifier.of(catalogName,
tablePath.getDatabaseName(), tablePath.getTableName());
+ return CatalogTable.of(tableIdentifier, builder.build(),
buildConnectorOptions(tablePath), Collections.emptyList(), "");
+ } catch (Exception e) {
+ throw new CatalogException(String.format("Failed getting table
%s", tablePath.getFullName()), e);
+ }
+ }
+
+ /**
+ * @see com.mysql.cj.MysqlType
+ * @see ResultSetImpl#getObjectStoredProc(int, int)
+ */
+ private SeaTunnelDataType<?> fromJdbcType(ResultSetMetaData metadata, int
colIndex) throws SQLException {
+ MysqlType mysqlType =
MysqlType.getByName(metadata.getColumnTypeName(colIndex));
+ switch (mysqlType) {
+ case NULL:
+ return BasicType.VOID_TYPE;
+ case BOOLEAN:
+ return BasicType.BOOLEAN_TYPE;
+ case BIT:
+ case TINYINT:
+ return BasicType.BYTE_TYPE;
+ case TINYINT_UNSIGNED:
+ case SMALLINT:
+ return BasicType.SHORT_TYPE;
+ case SMALLINT_UNSIGNED:
+ case INT:
+ case MEDIUMINT:
+ case MEDIUMINT_UNSIGNED:
+ return BasicType.INT_TYPE;
+ case INT_UNSIGNED:
+ case BIGINT:
+ return BasicType.LONG_TYPE;
+ case FLOAT:
+ case FLOAT_UNSIGNED:
+ return BasicType.FLOAT_TYPE;
+ case DOUBLE:
+ case DOUBLE_UNSIGNED:
+ return BasicType.DOUBLE_TYPE;
+ case TIME:
+ return LocalTimeType.LOCAL_TIME_TYPE;
+ case DATE:
+ return LocalTimeType.LOCAL_DATE_TYPE;
+ case TIMESTAMP:
+ case DATETIME:
+ return LocalTimeType.LOCAL_DATE_TIME_TYPE;
+ // TODO: to confirm
+ case CHAR:
+ case VARCHAR:
+ case TINYTEXT:
+ case TEXT:
+ case MEDIUMTEXT:
+ case LONGTEXT:
+ case JSON:
+ case ENUM:
+ return BasicType.STRING_TYPE;
+ case BINARY:
+ case VARBINARY:
+ case TINYBLOB:
+ case BLOB:
+ case MEDIUMBLOB:
+ case LONGBLOB:
+ case GEOMETRY:
+ return PrimitiveArrayType.PRIMITIVE_BYTE_ARRAY_TYPE;
+ case BIGINT_UNSIGNED:
+ case DECIMAL:
+ case DECIMAL_UNSIGNED:
+ int precision = metadata.getPrecision(colIndex);
+ int scale = metadata.getScale(colIndex);
+ return BasicType.BIG_DECIMAL_TYPE;
+ // TODO: support 'SET' & 'YEAR' type
+ default:
+ throw new UnsupportedOperationException(String.format("Doesn't
support MySQL type '%s' yet", mysqlType.getName()));
+ }
+ }
+
+ @SuppressWarnings("MagicNumber")
+ private Map<String, String> buildConnectorOptions(TablePath tablePath) {
+ Map<String, String> options = new HashMap<>(8);
+ options.put("connector", "jdbc");
+ options.put("url", baseUrl + tablePath.getDatabaseName());
+ options.put("table-name", tablePath.getFullName());
+ options.put("username", username);
+ options.put("password", pwd);
+ return options;
+ }
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalogTest.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalogTest.java
new file mode 100644
index 000000000..251c24d21
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalogTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.ValueSource;
+
+public class AbstractJdbcCatalogTest {
+
+ @ParameterizedTest
+ @ValueSource(strings = {"jdbc:mysql://localhost:5432/",
"jdbc:mysql://localhost:5432", "jdbc:mysql://localhost:5432/",
"jdbc:postgresql://localhost:5432"})
+ public void testValidateJdbcBaseUrl(String baseUrl) {
+ Assertions.assertDoesNotThrow(() ->
AbstractJdbcCatalog.validateJdbcUrlWithoutDatabase(baseUrl));
+ Assertions.assertThrowsExactly(IllegalArgumentException.class, () ->
AbstractJdbcCatalog.validateJdbcUrlWithDatabase(baseUrl));
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {"jdbc:mysql://localhost:5432/db",
"jdbc:postgresql://localhost:5432/db"})
+ public void testValidateJdbcDefault(String defaultUrl) {
+ Assertions.assertDoesNotThrow(() ->
AbstractJdbcCatalog.validateJdbcUrlWithDatabase(defaultUrl));
+ Assertions.assertThrowsExactly(IllegalArgumentException.class, () ->
AbstractJdbcCatalog.validateJdbcUrlWithoutDatabase(defaultUrl));
+ }
+
+ @ParameterizedTest
+ @CsvSource({"jdbc:mysql://localhost:5432/db, jdbc:mysql://localhost:5432/,
db",
+ "jdbc:postgresql://localhost:5432/db,
jdbc:postgresql://localhost:5432/, db"})
+ public void testSplitDefaultUrl(String defaultUrl, String expectedUrl,
String expectedDatabase) {
+ Assertions.assertArrayEquals(new String[] {expectedUrl,
expectedDatabase}, AbstractJdbcCatalog.splitDefaultUrl(defaultUrl));
+ }
+}