This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 180bfb23d [feature] Introduce flink jdbc catalog store plugin. (#3914)
180bfb23d is described below
commit 180bfb23dac5b48e3dbc007918f5a0028cd3332c
Author: ouyangwulin <[email protected]>
AuthorDate: Wed Jul 31 20:07:00 2024 +0800
[feature] Introduce flink jdbc catalog store plugin. (#3914)
* [catalog] Introduce flink jdbc catalog store plugin.
* fixed delete where unmatched
* improve jdbc close method.
* [catalog] refactor project module to stream-flink.
* [catalog] refactor package name
* [catalog] delete unused logs
---
.idea/vcs.xml | 28 ++--
streampark-flink/pom.xml | 6 +-
.../pom.xml | 98 +++++-------
.../apache/streampark/catalog/JacksonUtils.java | 54 +++++++
.../streampark/catalog/JdbcCatalogStore.java | 175 +++++++++++++++++++++
.../catalog/JdbcCatalogStoreFactory.java | 101 ++++++++++++
.../catalog/JdbcCatalogStoreFactoryOptions.java | 71 +++++++++
.../catalog/connections/JdbcConnectionOptions.java | 153 ++++++++++++++++++
.../connections/JdbcConnectionProvider.java | 85 ++++++++++
.../connections/SimpleJdbcConnectionProvider.java | 150 ++++++++++++++++++
.../org.apache.flink.table.factories.Factory | 16 ++
.../catalog/JdbcCatalogStoreFactoryTest.java | 56 +++++++
.../streampark/catalog/JdbcCatalogStoreTest.java | 137 ++++++++++++++++
.../streampark/catalog/mysql/MySqlContainer.java | 175 +++++++++++++++++++++
.../streampark/catalog/mysql/MysqlBaseITCASE.java | 57 +++++++
.../src/test/resources/docker/server/my.cnf | 64 ++++++++
.../src/test/resources/docker/setup.sql | 41 +++++
.../streampark-flink-shims_flink-1.18/pom.xml | 2 +-
18 files changed, 1396 insertions(+), 73 deletions(-)
diff --git a/.idea/vcs.xml b/.idea/vcs.xml
index eef8c2d27..48b645033 100644
--- a/.idea/vcs.xml
+++ b/.idea/vcs.xml
@@ -16,17 +16,17 @@
~ limitations under the License.
-->
<project version="4">
- <component name="VcsDirectoryMappings">
- <mapping directory="$PROJECT_DIR$" vcs="Git" />
- </component>
- <component name="IssueNavigationConfiguration">
- <option name="links">
- <list>
- <IssueNavigationLink>
- <option name="issueRegexp" value="#(\d+)" />
- <option name="linkRegexp"
value="https://github.com/apache/incubator-streampark/pull/$1" />
- </IssueNavigationLink>
- </list>
- </option>
- </component>
-</project>
+ <component name="VcsDirectoryMappings">
+ <mapping directory="$PROJECT_DIR$" vcs="Git" />
+ </component>
+ <component name="IssueNavigationConfiguration">
+ <option name="links">
+ <list>
+ <IssueNavigationLink>
+ <option name="issueRegexp" value="#(\d+)" />
+ <option name="linkRegexp"
value="https://github.com/apache/incubator-streampark/pull/$1" />
+ </IssueNavigationLink>
+ </list>
+ </option>
+ </component>
+</project>
\ No newline at end of file
diff --git a/streampark-flink/pom.xml b/streampark-flink/pom.xml
index c3befc8cf..ea053a2e5 100644
--- a/streampark-flink/pom.xml
+++ b/streampark-flink/pom.xml
@@ -38,6 +38,7 @@
<module>streampark-flink-packer</module>
<module>streampark-flink-kubernetes</module>
<module>streampark-flink-sql-gateway</module>
+ <module>streampark-flink-catalog-store</module>
</modules>
<dependencies>
@@ -62,7 +63,10 @@
<version>${jupiter.version}</version>
<scope>test</scope>
</dependency>
-
+ <dependency>
+ <groupId>org.assertj</groupId>
+ <artifactId>assertj-core</artifactId>
+ </dependency>
</dependencies>
<build>
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/pom.xml
b/streampark-flink/streampark-flink-catalog-store/pom.xml
similarity index 63%
copy from
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/pom.xml
copy to streampark-flink/streampark-flink-catalog-store/pom.xml
index db8ff46cf..78223ed55 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/pom.xml
+++ b/streampark-flink/streampark-flink-catalog-store/pom.xml
@@ -16,103 +16,87 @@
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/maven-v4_0_0.xsd">
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
-
<parent>
<groupId>org.apache.streampark</groupId>
- <artifactId>streampark-flink-shims</artifactId>
+ <artifactId>streampark-flink</artifactId>
<version>2.2.0-SNAPSHOT</version>
</parent>
-
<artifactId>streampark-flink-shims_flink-1.18_${scala.binary.version}</artifactId>
- <name>StreamPark : Flink Shims 1.18</name>
+ <artifactId>streampark-flink-catalog-store</artifactId>
+ <name>StreamPark : Flink Catalog Store</name>
<properties>
- <flink.version>1.18.0</flink.version>
+ <flink.shaded.version19>19.0</flink.shaded.version19>
+ <flink.version>1.18.1</flink.version>
+ <flink.shaded.jackson.version>2.15.3</flink.shaded.jackson.version>
</properties>
<dependencies>
- <dependency>
- <groupId>org.apache.streampark</groupId>
-
<artifactId>streampark-flink-shims-base_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
-
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
- <version>${flink.version}</version>
- <scope>provided</scope>
- </dependency>
-
- <!--flink-->
<dependency>
<groupId>org.apache.flink</groupId>
-
<artifactId>flink-table-api-scala_${scala.binary.version}</artifactId>
+ <artifactId>flink-annotations</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
-
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-scala_${scala.binary.version}</artifactId>
+ <artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-shaded-jackson</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
-
<dependency>
<groupId>org.apache.flink</groupId>
-
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
+ <artifactId>flink-table-api-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-shaded-jackson</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
-
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-uber</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-shaded-jackson</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
-
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
- <version>${flink.version}</version>
- <optional>true</optional>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-statebackend-rocksdb</artifactId>
- <version>${flink.version}</version>
- <scope>provided</scope>
- </dependency>
-
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-yarn</artifactId>
- <version>${flink.version}</version>
+ <artifactId>flink-shaded-jackson</artifactId>
+
<version>${flink.shaded.jackson.version}-${flink.shaded.version19}</version>
<scope>provided</scope>
</dependency>
-
<dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client-api</artifactId>
- <optional>true</optional>
+ <groupId>mysql</groupId>
+ <artifactId>mysql-connector-java</artifactId>
+ <scope>test</scope>
</dependency>
-
<dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client-runtime</artifactId>
- <optional>true</optional>
+ <groupId>org.assertj</groupId>
+ <artifactId>assertj-core</artifactId>
+ <scope>test</scope>
</dependency>
-
<dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-kubernetes</artifactId>
- <version>${flink.version}</version>
- <scope>provided</scope>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>mysql</artifactId>
+ <version>1.19.0</version>
+ <scope>test</scope>
</dependency>
</dependencies>
@@ -132,7 +116,7 @@
<dependencyReducedPomLocation>${project.basedir}/target/dependency-reduced-pom.xml</dependencyReducedPomLocation>
<artifactSet>
<includes>
-
<include>org.apache.flink:flink-table-api-scala-bridge_${scala.binary.version}</include>
+ <include>org.apache.streampark:*</include>
</includes>
</artifactSet>
<filters>
diff --git
a/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/JacksonUtils.java
b/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/JacksonUtils.java
new file mode 100644
index 000000000..11cb6906e
--- /dev/null
+++
b/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/JacksonUtils.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.catalog;
+
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializationFeature;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.module.SimpleModule;
+
+/** Serialization utils */
+public final class JacksonUtils {
+
+ private JacksonUtils() {
+ }
+
+ private static final ObjectMapper MAPPER;
+
+ static {
+ MAPPER = new ObjectMapper();
+ MAPPER.registerModule(new SimpleModule());
+ MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
+ MAPPER.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
+ MAPPER.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS,
false);
+ }
+
+ public static <T> T read(String json, Class<T> clazz) throws
JsonProcessingException {
+ return MAPPER.readValue(json, clazz);
+ }
+
+ public static <T> T read(String json, TypeReference<T> typeReference)
throws JsonProcessingException {
+ return MAPPER.readValue(json, typeReference);
+ }
+
+ public static String write(Object object) throws JsonProcessingException {
+ return MAPPER.writeValueAsString(object);
+ }
+}
diff --git
a/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/JdbcCatalogStore.java
b/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/JdbcCatalogStore.java
new file mode 100644
index 000000000..feeb76841
--- /dev/null
+++
b/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/JdbcCatalogStore.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.catalog;
+
+import org.apache.streampark.catalog.connections.JdbcConnectionProvider;
+
+import org.apache.flink.configuration.Configuration;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.table.catalog.AbstractCatalogStore;
+import org.apache.flink.table.catalog.CatalogDescriptor;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * Catalog Store for Jdbc.
+ */
+public class JdbcCatalogStore extends AbstractCatalogStore {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(JdbcCatalogStore.class);
+ private final JdbcConnectionProvider jdbcConnectionProvider;
+ private transient Connection connection;
+ private transient Statement statement;
+ private transient ResultSet resultSet;
+
+ private final String catalogTableName;
+
+ public JdbcCatalogStore(JdbcConnectionProvider jdbcConnectionProvider,
String catalogTableName) {
+ this.jdbcConnectionProvider = jdbcConnectionProvider;
+ this.catalogTableName = catalogTableName;
+ }
+
+ @Override
+ public void open() {
+ try {
+ this.connection =
jdbcConnectionProvider.getOrEstablishConnection();
+ this.statement = this.connection.createStatement();
+ super.open();
+ } catch (SQLException | ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void close() {
+ closeResultSetAndStatement();
+ try {
+ jdbcConnectionProvider.close();
+ } catch (Exception e) {
+ throw new CatalogException(e);
+ }
+ super.close();
+ }
+
+ @Override
+ public void storeCatalog(String catalogName, CatalogDescriptor
catalogDescriptor) throws CatalogException {
+ checkOpenState();
+ try {
+ if (!contains(catalogName)) {
+ statement.executeUpdate(String.format(
+ "insert into %s
(catalog_name,configuration,create_time,update_time) values
('%s','%s',now(),now())",
+ this.catalogTableName, catalogName,
+
JacksonUtils.write(catalogDescriptor.getConfiguration().toMap())));
+ } else {
+ LOG.error("catalog {} is exist.", catalogName);
+ }
+ } catch (SQLException | JsonProcessingException e) {
+ throw new CatalogException(String.format("Store catalog %s
failed!", catalogName), e);
+ }
+ }
+
+ @Override
+ public void removeCatalog(String catalogName, boolean ignoreIfNotExists)
throws CatalogException {
+ checkOpenState();
+ try {
+ int effectRow = statement.executeUpdate(
+ String.format("delete from %s where catalog_name='%s'",
this.catalogTableName, catalogName));
+
+ if (effectRow == 0 && !ignoreIfNotExists) {
+ throw new CatalogException(String.format("Remove catalog %s
failed!", catalogName));
+ }
+ } catch (SQLException e) {
+ LOG.error("Remove catalog {} failed!", catalogName, e);
+ throw new CatalogException(String.format("Remove catalog %s
failed!", catalogName));
+ }
+ }
+
+ @Override
+ public Optional<CatalogDescriptor> getCatalog(String catalogName) throws
CatalogException {
+ checkOpenState();
+ try {
+ resultSet = statement
+ .executeQuery(
+ String.format("select * from %s where catalog_name='%s'",
this.catalogTableName, catalogName));
+ while (resultSet.next()) {
+ return Optional.of(CatalogDescriptor.of(catalogName,
+
Configuration.fromMap(JacksonUtils.read(resultSet.getString("configuration"),
Map.class))));
+ }
+ } catch (SQLException | JsonProcessingException e) {
+ throw new CatalogException(String.format("Get catalog %s failed!",
catalogName), e);
+ }
+ return Optional.empty();
+ }
+
+ @Override
+ public Set<String> listCatalogs() throws CatalogException {
+ checkOpenState();
+ Set<String> catalogs = new HashSet<>();
+ try {
+ resultSet = statement.executeQuery(String.format("select * from
%s;", this.catalogTableName));
+ while (resultSet.next()) {
+ catalogs.add(resultSet.getString("catalog_name"));
+ }
+ return catalogs;
+ } catch (SQLException e) {
+ throw new CatalogException("List catalogs failed!", e);
+ }
+ }
+
+ @Override
+ public boolean contains(String catalogName) throws CatalogException {
+ checkOpenState();
+ try {
+ resultSet = statement.executeQuery(
+ String.format("select * from %s where catalog_name='%s';",
this.catalogTableName, catalogName));
+ while (resultSet.next()) {
+ resultSet.getString("catalog_name");
+ return true;
+ }
+ } catch (SQLException e) {
+ throw new CatalogException(String.format("Catalog %s is contains
failed!", catalogName), e);
+ }
+ return false;
+ }
+
+ private void closeResultSetAndStatement() {
+ try {
+ if (resultSet != null && !resultSet.isClosed()) {
+ resultSet.close();
+ }
+ if (statement != null && !statement.isClosed()) {
+ statement.close();
+ }
+ resultSet = null;
+ statement = null;
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git
a/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/JdbcCatalogStoreFactory.java
b/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/JdbcCatalogStoreFactory.java
new file mode 100644
index 000000000..38aa6e5c7
--- /dev/null
+++
b/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/JdbcCatalogStoreFactory.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.catalog;
+
+import org.apache.streampark.catalog.connections.JdbcConnectionOptions;
+import org.apache.streampark.catalog.connections.JdbcConnectionProvider;
+import org.apache.streampark.catalog.connections.SimpleJdbcConnectionProvider;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.catalog.CatalogStore;
+import org.apache.flink.table.factories.CatalogStoreFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import static
org.apache.flink.table.factories.FactoryUtil.createCatalogStoreFactoryHelper;
+import static
org.apache.streampark.catalog.JdbcCatalogStoreFactoryOptions.DRIVER;
+import static
org.apache.streampark.catalog.JdbcCatalogStoreFactoryOptions.IDENTIFIER;
+import static
org.apache.streampark.catalog.JdbcCatalogStoreFactoryOptions.MAX_RETRY_TIMEOUT;
+import static
org.apache.streampark.catalog.JdbcCatalogStoreFactoryOptions.PASSWORD;
+import static
org.apache.streampark.catalog.JdbcCatalogStoreFactoryOptions.TABLE_NAME;
+import static org.apache.streampark.catalog.JdbcCatalogStoreFactoryOptions.URL;
+import static
org.apache.streampark.catalog.JdbcCatalogStoreFactoryOptions.USERNAME;
+
+/** Catalog Store Factory for Jdbc. */
+public class JdbcCatalogStoreFactory implements CatalogStoreFactory {
+
+ private JdbcConnectionProvider jdbcConnectionProvider;
+ private transient String catalogTableName;
+
+ @Override
+ public CatalogStore createCatalogStore() {
+ return new JdbcCatalogStore(jdbcConnectionProvider,
this.catalogTableName);
+ }
+
+ @Override
+ public void open(Context context) {
+ FactoryUtil.FactoryHelper<CatalogStoreFactory> factoryHelper =
+ createCatalogStoreFactoryHelper(this, context);
+ factoryHelper.validate();
+
+ ReadableConfig options = factoryHelper.getOptions();
+ JdbcConnectionOptions jdbcConnectionOptions =
+ new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
+ .withUrl(options.get(URL))
+ .withDriverName(options.get(DRIVER))
+ .withUsername(options.get(USERNAME))
+ .withPassword(options.get(PASSWORD))
+
.withConnectionCheckTimeoutSeconds(options.get(MAX_RETRY_TIMEOUT))
+ .build();
+
+ this.catalogTableName = options.get(TABLE_NAME);
+ this.jdbcConnectionProvider = new
SimpleJdbcConnectionProvider(jdbcConnectionOptions);
+ }
+
+ @Override
+ public void close() {
+ this.jdbcConnectionProvider.closeConnection();
+ }
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(URL);
+ options.add(DRIVER);
+ options.add(USERNAME);
+ options.add(PASSWORD);
+ options.add(TABLE_NAME);
+ return Collections.unmodifiableSet(options);
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(MAX_RETRY_TIMEOUT);
+ return Collections.unmodifiableSet(options);
+ }
+}
diff --git
a/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/JdbcCatalogStoreFactoryOptions.java
b/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/JdbcCatalogStoreFactoryOptions.java
new file mode 100644
index 000000000..85a324bb5
--- /dev/null
+++
b/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/JdbcCatalogStoreFactoryOptions.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.catalog;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+/**
+ * Catalog Store Options for Jdbc.
+ */
+public class JdbcCatalogStoreFactoryOptions {
+
+ public static final String IDENTIFIER = "jdbc";
+
+ public static final ConfigOption<String> URL =
+ ConfigOptions.key("url")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "The JDBC database url.");
+ public static final ConfigOption<String> TABLE_NAME =
+ ConfigOptions.key("table-name")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "The name of JDBC table to connect.");
+ public static final ConfigOption<String> DRIVER =
+ ConfigOptions.key("driver")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "The class name of the JDBC driver to use to connect to this
URL, if not set, it will automatically be derived from the URL.");
+ public static final ConfigOption<String> USERNAME =
+ ConfigOptions.key("username")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "The JDBC user name. 'username' and 'password' must both be
specified if any of them is specified.");
+
+ public static final ConfigOption<String> PASSWORD =
+ ConfigOptions.key("password")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "The JDBC password.");
+
+ public static final ConfigOption<Integer> MAX_RETRY_TIMEOUT =
+ ConfigOptions.key("max-retry-timeout")
+ .intType()
+ .defaultValue(60)
+ .withDescription(
+ "Maximum timeout between retries. The timeout should be in
second granularity and shouldn't be smaller than 1 second.");
+
+ private JdbcCatalogStoreFactoryOptions() {
+ }
+}
diff --git
a/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/connections/JdbcConnectionOptions.java
b/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/connections/JdbcConnectionOptions.java
new file mode 100644
index 000000000..62f460d4e
--- /dev/null
+++
b/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/connections/JdbcConnectionOptions.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.catalog.connections;
+
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+
+public class JdbcConnectionOptions implements Serializable {
+
+ public static final String USER_KEY = "user";
+ public static final String PASSWORD_KEY = "password";
+
+ private static final long serialVersionUID = 1L;
+
+ protected final String url;
+ @Nullable
+ protected final String driverName;
+ protected final int connectionCheckTimeoutSeconds;
+ @Nonnull
+ protected final Properties properties;
+
+ protected JdbcConnectionOptions(
+ String url,
+ @Nullable String driverName,
+ int connectionCheckTimeoutSeconds,
+ @Nonnull Properties properties) {
+ Preconditions.checkArgument(
+ connectionCheckTimeoutSeconds > 0,
+ "Connection check timeout seconds shouldn't be smaller than 1");
+ this.url = Preconditions.checkNotNull(url, "jdbc url is empty");
+ this.driverName = driverName;
+ this.connectionCheckTimeoutSeconds = connectionCheckTimeoutSeconds;
+ this.properties =
+ Preconditions.checkNotNull(properties, "Connection properties must
be non-null");
+ }
+
+ public String getDbURL() {
+ return url;
+ }
+
+ @Nullable
+ public String getDriverName() {
+ return driverName;
+ }
+
+ public Optional<String> getUsername() {
+ return Optional.ofNullable(properties.getProperty(USER_KEY));
+ }
+
+ public Optional<String> getPassword() {
+ return Optional.ofNullable(properties.getProperty(PASSWORD_KEY));
+ }
+
+ public int getConnectionCheckTimeoutSeconds() {
+ return connectionCheckTimeoutSeconds;
+ }
+
+ @Nonnull
+ public Properties getProperties() {
+ return properties;
+ }
+
+ @Nonnull
+ public static Properties getBriefAuthProperties(String user, String
password) {
+ final Properties result = new Properties();
+ if (Objects.nonNull(user)) {
+ result.put(USER_KEY, user);
+ }
+ if (Objects.nonNull(password)) {
+ result.put(PASSWORD_KEY, password);
+ }
+ return result;
+ }
+
+ /** Builder for {@link JdbcConnectionOptions}. */
+ public static class JdbcConnectionOptionsBuilder {
+
+ private String url;
+ private String driverName;
+ private int connectionCheckTimeoutSeconds = 60;
+ private final Properties properties = new Properties();
+
+ public JdbcConnectionOptionsBuilder withUrl(String url) {
+ this.url = url;
+ return this;
+ }
+
+ public JdbcConnectionOptionsBuilder withDriverName(String driverName) {
+ this.driverName = driverName;
+ return this;
+ }
+
+ public JdbcConnectionOptionsBuilder withProperty(String propKey,
String propVal) {
+ Preconditions.checkNotNull(propKey, "Connection property key
mustn't be null");
+ Preconditions.checkNotNull(propVal, "Connection property value
mustn't be null");
+ this.properties.put(propKey, propVal);
+ return this;
+ }
+
+ public JdbcConnectionOptionsBuilder withUsername(String username) {
+ if (Objects.nonNull(username)) {
+ this.properties.put(USER_KEY, username);
+ }
+ return this;
+ }
+
+ public JdbcConnectionOptionsBuilder withPassword(String password) {
+ if (Objects.nonNull(password)) {
+ this.properties.put(PASSWORD_KEY, password);
+ }
+ return this;
+ }
+
+ /**
+ * Set the maximum timeout between retries, default is 60 seconds.
+ *
+ * @param connectionCheckTimeoutSeconds the timeout seconds, shouldn't
smaller than 1
+ * second.
+ */
+ public JdbcConnectionOptionsBuilder withConnectionCheckTimeoutSeconds(
+
int connectionCheckTimeoutSeconds) {
+ this.connectionCheckTimeoutSeconds = connectionCheckTimeoutSeconds;
+ return this;
+ }
+
+ public JdbcConnectionOptions build() {
+ return new JdbcConnectionOptions(
+ url, driverName, connectionCheckTimeoutSeconds, properties);
+ }
+ }
+}
diff --git
a/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/connections/JdbcConnectionProvider.java
b/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/connections/JdbcConnectionProvider.java
new file mode 100644
index 000000000..c49cb1fc7
--- /dev/null
+++
b/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/connections/JdbcConnectionProvider.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.catalog.connections;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Properties;
+
+/** JDBC connection provider. */
+@PublicEvolving
+public interface JdbcConnectionProvider extends Serializable, AutoCloseable {
+
+ /**
+ * Get existing connection.
+ *
+ * @return existing connection
+ */
+ @Nullable
+ Connection getConnection();
+
+ /**
+ * Get existing connection properties.
+ *
+ * @return existing connection properties
+ */
+ @Nonnull
+ default Properties getProperties() {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Check whether possible existing connection is valid or not through
{@link
+ * Connection#isValid(int)}.
+ *
+ * @return true if existing connection is valid
+ * @throws SQLException sql exception throw from {@link
Connection#isValid(int)}
+ */
+ boolean isConnectionValid() throws SQLException;
+
+ /**
+ * Get existing connection or establish an new one if there is none.
+ *
+ * @return existing connection or newly established connection
+ * @throws SQLException sql exception
+ * @throws ClassNotFoundException driver class not found
+ */
+ Connection getOrEstablishConnection() throws SQLException,
ClassNotFoundException;
+
+ /** Close possible existing connection. */
+ void closeConnection();
+
+ /**
+ * Close possible existing connection and establish an new one.
+ *
+ * @return newly established connection
+ * @throws SQLException sql exception
+ * @throws ClassNotFoundException driver class not found
+ */
+ Connection reestablishConnection() throws SQLException,
ClassNotFoundException;
+
+ default void close() throws Exception {
+ closeConnection();
+ }
+}
diff --git
a/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/connections/SimpleJdbcConnectionProvider.java
b/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/connections/SimpleJdbcConnectionProvider.java
new file mode 100644
index 000000000..e78c0bfbe
--- /dev/null
+++
b/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/connections/SimpleJdbcConnectionProvider.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.catalog.connections;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.Serializable;
+import java.sql.Connection;
+import java.sql.Driver;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Enumeration;
+import java.util.Properties;
+
+/** Simple JDBC connection provider. */
+@NotThreadSafe
+@PublicEvolving
+public class SimpleJdbcConnectionProvider implements JdbcConnectionProvider,
Serializable {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(SimpleJdbcConnectionProvider.class);
+
+ private static final long serialVersionUID = 1L;
+
+ private final JdbcConnectionOptions jdbcOptions;
+
+ private transient Driver loadedDriver;
+ private transient Connection connection;
+
+ static {
+ // Load DriverManager first to avoid deadlock between DriverManager's
+ // static initialization block and specific driver class's static
+ // initialization block when two different driver classes are loading
+ // concurrently using Class.forName while DriverManager is
uninitialized
+ // before.
+ //
+ // This could happen in JDK 8 but not above as driver loading has been
+ // moved out of DriverManager's static initialization block since JDK
9.
+ DriverManager.getDrivers();
+ }
+
+ public SimpleJdbcConnectionProvider(JdbcConnectionOptions jdbcOptions) {
+ this.jdbcOptions = jdbcOptions;
+ }
+
+ @Override
+ public Connection getConnection() {
+ return connection;
+ }
+
+ @Nonnull
+ @Override
+ public Properties getProperties() {
+ return jdbcOptions.getProperties();
+ }
+
+ @Override
+ public boolean isConnectionValid() throws SQLException {
+ return connection != null
+ &&
connection.isValid(jdbcOptions.getConnectionCheckTimeoutSeconds());
+ }
+
+ private Driver loadDriver(String driverName) throws SQLException,
ClassNotFoundException {
+ Preconditions.checkNotNull(driverName);
+ Enumeration<Driver> drivers = DriverManager.getDrivers();
+ while (drivers.hasMoreElements()) {
+ Driver driver = drivers.nextElement();
+ if (driver.getClass().getName().equals(driverName)) {
+ return driver;
+ }
+ }
+ // We could reach here for reasons:
+ // * Class loader hell of DriverManager(see JDK-8146872).
+ // * driver is not installed as a service provider.
+ Class<?> clazz =
+ Class.forName(driverName, true,
Thread.currentThread().getContextClassLoader());
+ try {
+ return (Driver) clazz.newInstance();
+ } catch (Exception ex) {
+ throw new SQLException("Fail to create driver of class " +
driverName, ex);
+ }
+ }
+
+ private Driver getLoadedDriver() throws SQLException,
ClassNotFoundException {
+ if (loadedDriver == null) {
+ loadedDriver = loadDriver(jdbcOptions.getDriverName());
+ }
+ return loadedDriver;
+ }
+
+ @Override
+ public Connection getOrEstablishConnection() throws SQLException,
ClassNotFoundException {
+ if (isConnectionValid()) {
+ return connection;
+ }
+ if (jdbcOptions.getDriverName() == null) {
+ connection = DriverManager.getConnection(jdbcOptions.getDbURL(),
getProperties());
+ } else {
+ Driver driver = getLoadedDriver();
+ connection = driver.connect(jdbcOptions.getDbURL(),
getProperties());
+ if (connection == null) {
+ // Throw same exception as DriverManager.getConnection when no
driver found to match
+ // caller expectation.
+ throw new SQLException(
+ "No suitable driver found for " + jdbcOptions.getDbURL(),
"08001");
+ }
+ }
+ return connection;
+ }
+
+ @Override
+ public void closeConnection() {
+ if (connection != null) {
+ try {
+ connection.close();
+ } catch (SQLException e) {
+ LOG.warn("JDBC connection close failed.", e);
+ } finally {
+ connection = null;
+ }
+ }
+ }
+
+ @Override
+ public Connection reestablishConnection() throws SQLException,
ClassNotFoundException {
+ closeConnection();
+ return getOrEstablishConnection();
+ }
+}
diff --git
a/streampark-flink/streampark-flink-catalog-store/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
b/streampark-flink/streampark-flink-catalog-store/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 000000000..9bcdf4448
--- /dev/null
+++
b/streampark-flink/streampark-flink-catalog-store/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.streampark.catalog.JdbcCatalogStoreFactory
diff --git
a/streampark-flink/streampark-flink-catalog-store/src/test/java/org/apache/streampark/catalog/JdbcCatalogStoreFactoryTest.java
b/streampark-flink/streampark-flink-catalog-store/src/test/java/org/apache/streampark/catalog/JdbcCatalogStoreFactoryTest.java
new file mode 100644
index 000000000..59842e1b4
--- /dev/null
+++
b/streampark-flink/streampark-flink-catalog-store/src/test/java/org/apache/streampark/catalog/JdbcCatalogStoreFactoryTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.catalog;
+
+import org.apache.streampark.catalog.mysql.MysqlBaseITCASE;
+
+import org.apache.flink.table.catalog.CatalogStore;
+import org.apache.flink.table.factories.CatalogStoreFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+public class JdbcCatalogStoreFactoryTest extends MysqlBaseITCASE {
+
+ @org.junit.Test
+ public void testCatalogStoreFactoryDiscovery() {
+
+ String factoryIdentifier = JdbcCatalogStoreFactoryOptions.IDENTIFIER;
+ Map<String, String> options = new HashMap<>();
+ options.put("url", MYSQL_CONTAINER.getJdbcUrl());
+ options.put("table-name", "t_mysql_catalog");
+ options.put("driver", MYSQL_CONTAINER.getDriverClassName());
+ options.put("username", MYSQL_CONTAINER.getUsername());
+ options.put("password", MYSQL_CONTAINER.getPassword());
+ ClassLoader classLoader =
Thread.currentThread().getContextClassLoader();
+ final FactoryUtil.DefaultCatalogStoreContext discoveryContext =
+ new FactoryUtil.DefaultCatalogStoreContext(options, null,
classLoader);
+ final CatalogStoreFactory factory =
+ FactoryUtil.discoverFactory(
+ classLoader, CatalogStoreFactory.class, factoryIdentifier);
+ factory.open(discoveryContext);
+
+ CatalogStore catalogStore = factory.createCatalogStore();
+ assertThat(catalogStore instanceof JdbcCatalogStore).isTrue();
+
+ factory.close();
+ }
+}
diff --git
a/streampark-flink/streampark-flink-catalog-store/src/test/java/org/apache/streampark/catalog/JdbcCatalogStoreTest.java
b/streampark-flink/streampark-flink-catalog-store/src/test/java/org/apache/streampark/catalog/JdbcCatalogStoreTest.java
new file mode 100644
index 000000000..626768863
--- /dev/null
+++
b/streampark-flink/streampark-flink-catalog-store/src/test/java/org/apache/streampark/catalog/JdbcCatalogStoreTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.catalog;
+
+import org.apache.streampark.catalog.connections.JdbcConnectionOptions;
+import org.apache.streampark.catalog.connections.JdbcConnectionProvider;
+import org.apache.streampark.catalog.connections.SimpleJdbcConnectionProvider;
+import org.apache.streampark.catalog.mysql.MysqlBaseITCASE;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.catalog.CatalogDescriptor;
+import org.apache.flink.table.catalog.CatalogStore;
+import org.apache.flink.table.catalog.CommonCatalogOptions;
+import org.apache.flink.table.catalog.GenericInMemoryCatalogFactoryOptions;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+
+import org.assertj.core.api.ThrowableAssert;
+
+import java.util.Set;
+
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
+
+public class JdbcCatalogStoreTest extends MysqlBaseITCASE {
+
+ private static final String DUMMY = "dummy";
+ private static final CatalogDescriptor DUMMY_CATALOG;
+
+ static {
+ Configuration conf = new Configuration();
+ conf.set(CommonCatalogOptions.CATALOG_TYPE, DUMMY);
+ conf.set(GenericInMemoryCatalogFactoryOptions.DEFAULT_DATABASE,
"dummy_db");
+
+ DUMMY_CATALOG = CatalogDescriptor.of(DUMMY, conf);
+ }
+ @org.junit.Test
+ public void testNotOpened() {
+ CatalogStore catalogStore = initCatalogStore();
+
+ assertCatalogStoreNotOpened(catalogStore::listCatalogs);
+ assertCatalogStoreNotOpened(() -> catalogStore.contains(DUMMY));
+ assertCatalogStoreNotOpened(() -> catalogStore.getCatalog(DUMMY));
+ assertCatalogStoreNotOpened(() -> catalogStore.storeCatalog(DUMMY,
DUMMY_CATALOG));
+ assertCatalogStoreNotOpened(() -> catalogStore.removeCatalog(DUMMY,
true));
+ }
+
+ @org.junit.Test
+ public void testStore() {
+ CatalogStore catalogStore = initCatalogStore();
+ catalogStore.open();
+
+ catalogStore.storeCatalog(DUMMY, DUMMY_CATALOG);
+
+ Set<String> storedCatalogs = catalogStore.listCatalogs();
+ assertThat(storedCatalogs.size()).isEqualTo(1);
+ assertThat(storedCatalogs.contains(DUMMY)).isTrue();
+ }
+
+ @org.junit.Test
+ public void testRemoveExisting() {
+ CatalogStore catalogStore = initCatalogStore();
+ catalogStore.open();
+
+ catalogStore.storeCatalog(DUMMY, DUMMY_CATALOG);
+ assertThat(catalogStore.listCatalogs().size()).isEqualTo(1);
+
+ catalogStore.removeCatalog(DUMMY, false);
+ assertThat(catalogStore.listCatalogs().size()).isEqualTo(0);
+ assertThat(catalogStore.contains(DUMMY)).isFalse();
+ }
+
+ @org.junit.Test
+ public void testRemoveNonExisting() {
+ CatalogStore catalogStore = initCatalogStore();
+ catalogStore.open();
+
+ catalogStore.removeCatalog(DUMMY, true);
+
+ assertThatThrownBy(() -> catalogStore.removeCatalog(DUMMY, false))
+ .isInstanceOf(CatalogException.class)
+ .hasMessageContaining(
+ "Remove catalog " + DUMMY + " failed!");
+ }
+
+ @org.junit.Test
+ public void testClosed() {
+ CatalogStore catalogStore = initCatalogStore();
+
+ catalogStore.open();
+
+ catalogStore.storeCatalog(DUMMY, DUMMY_CATALOG);
+ assertThat(catalogStore.listCatalogs().size()).isEqualTo(1);
+
+ catalogStore.close();
+
+ assertCatalogStoreNotOpened(catalogStore::listCatalogs);
+ assertCatalogStoreNotOpened(() -> catalogStore.contains(DUMMY));
+ assertCatalogStoreNotOpened(() -> catalogStore.getCatalog(DUMMY));
+ assertCatalogStoreNotOpened(() -> catalogStore.storeCatalog(DUMMY,
DUMMY_CATALOG));
+ assertCatalogStoreNotOpened(() -> catalogStore.removeCatalog(DUMMY,
true));
+ }
+
+ private void assertCatalogStoreNotOpened(
+ ThrowableAssert.ThrowingCallable
shouldRaiseThrowable) {
+ assertThatThrownBy(shouldRaiseThrowable)
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessageContaining("CatalogStore is not opened yet.");
+ }
+
+ private JdbcCatalogStore initCatalogStore() {
+ JdbcConnectionOptions jdbcConnectionOptions = new
JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
+ .withUrl(MYSQL_CONTAINER.getJdbcUrl())
+ .withDriverName(MYSQL_CONTAINER.getDriverClassName())
+ .withUsername(MYSQL_CONTAINER.getUsername())
+ .withPassword(MYSQL_CONTAINER.getPassword())
+ .build();
+
+ JdbcConnectionProvider jdbcConnectionProvider = new
SimpleJdbcConnectionProvider(jdbcConnectionOptions);
+ return new JdbcCatalogStore(jdbcConnectionProvider, "t_mysql_catalog");
+
+ }
+}
diff --git
a/streampark-flink/streampark-flink-catalog-store/src/test/java/org/apache/streampark/catalog/mysql/MySqlContainer.java
b/streampark-flink/streampark-flink-catalog-store/src/test/java/org/apache/streampark/catalog/mysql/MySqlContainer.java
new file mode 100644
index 000000000..94a5c2f7e
--- /dev/null
+++
b/streampark-flink/streampark-flink-catalog-store/src/test/java/org/apache/streampark/catalog/mysql/MySqlContainer.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.catalog.mysql;
+
+import org.testcontainers.containers.ContainerLaunchException;
+import org.testcontainers.containers.JdbcDatabaseContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class MySqlContainer extends JdbcDatabaseContainer {
+
+ public static final String IMAGE = "mysql";
+
+ public static final String MYSQL_VERSION = "8.0";
+ public static final Integer MYSQL_PORT = 3306;
+
+ private static final String MY_CNF_CONFIG_OVERRIDE_PARAM_NAME = "MY_CNF";
+ private static final String SETUP_SQL_PARAM_NAME = "SETUP_SQL";
+ private static final String MYSQL_ROOT_USER = "root";
+
+ private String databaseName = "test";
+ private String username = "test";
+ private String password = "test";
+
+ public MySqlContainer() {
+ this(MYSQL_VERSION);
+ }
+
+ public MySqlContainer(String version) {
+ super(DockerImageName.parse(IMAGE + ":" + version));
+ addExposedPort(MYSQL_PORT);
+ }
+
+ @Override
+ protected Set<Integer> getLivenessCheckPorts() {
+ return new HashSet<>(getMappedPort(MYSQL_PORT));
+ }
+
+ @Override
+ protected void configure() {
+ optionallyMapResourceParameterAsVolume(
+ MY_CNF_CONFIG_OVERRIDE_PARAM_NAME, "/etc/mysql/",
"mysql-default-conf");
+
+ if (parameters.containsKey(SETUP_SQL_PARAM_NAME)) {
+ optionallyMapResourceParameterAsVolume(
+ SETUP_SQL_PARAM_NAME, "/docker-entrypoint-initdb.d/", "N/A");
+ }
+
+ addEnv("MYSQL_DATABASE", databaseName);
+ addEnv("MYSQL_USER", username);
+ if (password != null && !password.isEmpty()) {
+ addEnv("MYSQL_PASSWORD", password);
+ addEnv("MYSQL_ROOT_PASSWORD", password);
+ } else if (MYSQL_ROOT_USER.equalsIgnoreCase(username)) {
+ addEnv("MYSQL_ALLOW_EMPTY_PASSWORD", "yes");
+ } else {
+ throw new ContainerLaunchException(
+ "Empty password can be used only with the root user");
+ }
+ setStartupAttempts(3);
+ }
+
+ @Override
+ public String getDriverClassName() {
+ try {
+ Class.forName("com.mysql.cj.jdbc.Driver");
+ return "com.mysql.cj.jdbc.Driver";
+ } catch (ClassNotFoundException e) {
+ return "com.mysql.jdbc.Driver";
+ }
+ }
+
+ public String getJdbcUrl(String databaseName) {
+ String additionalUrlParams = constructUrlParameters("?", "&");
+ return "jdbc:mysql://"
+ + getHost()
+ + ":"
+ + getDatabasePort()
+ + "/"
+ + databaseName
+ + additionalUrlParams;
+ }
+
+ @Override
+ public String getJdbcUrl() {
+ return getJdbcUrl(databaseName);
+ }
+
+ public int getDatabasePort() {
+ return getMappedPort(MYSQL_PORT);
+ }
+
+ @Override
+ protected String constructUrlForConnection(String queryString) {
+ String url = super.constructUrlForConnection(queryString);
+
+ if (!url.contains("useSSL=")) {
+ String separator = url.contains("?") ? "&" : "?";
+ url = url + separator + "useSSL=false";
+ }
+
+ if (!url.contains("allowPublicKeyRetrieval=")) {
+ url = url + "&allowPublicKeyRetrieval=true";
+ }
+
+ return url;
+ }
+
+ @Override
+ public String getDatabaseName() {
+ return databaseName;
+ }
+
+ @Override
+ public String getUsername() {
+ return username;
+ }
+
+ @Override
+ public String getPassword() {
+ return password;
+ }
+
+ @Override
+ protected String getTestQueryString() {
+ return "SELECT 1";
+ }
+
+ @SuppressWarnings("unchecked")
+ public MySqlContainer withConfigurationOverride(String s) {
+ parameters.put(MY_CNF_CONFIG_OVERRIDE_PARAM_NAME, s);
+ return this;
+ }
+
+ @SuppressWarnings("unchecked")
+ public MySqlContainer withSetupSQL(String sqlPath) {
+ parameters.put(SETUP_SQL_PARAM_NAME, sqlPath);
+ return this;
+ }
+
+ @Override
+ public MySqlContainer withDatabaseName(final String databaseName) {
+ this.databaseName = databaseName;
+ return this;
+ }
+
+ @Override
+ public MySqlContainer withUsername(final String username) {
+ this.username = username;
+ return this;
+ }
+
+ @Override
+ public MySqlContainer withPassword(final String password) {
+ this.password = password;
+ return this;
+ }
+}
diff --git
a/streampark-flink/streampark-flink-catalog-store/src/test/java/org/apache/streampark/catalog/mysql/MysqlBaseITCASE.java
b/streampark-flink/streampark-flink-catalog-store/src/test/java/org/apache/streampark/catalog/mysql/MysqlBaseITCASE.java
new file mode 100644
index 000000000..799335b96
--- /dev/null
+++
b/streampark-flink/streampark-flink-catalog-store/src/test/java/org/apache/streampark/catalog/mysql/MysqlBaseITCASE.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.catalog.mysql;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.util.stream.Stream;
+
+public class MysqlBaseITCASE {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(MysqlBaseITCASE.class);
+ protected static final MySqlContainer MYSQL_CONTAINER =
createMySqlContainer("docker/server/my.cnf");
+
+ @org.junit.BeforeClass
+ public static void startContainers() {
+ LOG.info("Starting containers...");
+ Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
+ LOG.info("Containers are started.");
+ }
+
+ @org.junit.AfterClass
+ public static void stopContainers() {
+ LOG.info("Stopping containers...");
+ if (MYSQL_CONTAINER != null) {
+ MYSQL_CONTAINER.stop();
+ }
+ LOG.info("Containers are stopped.");
+ }
+
+ protected static MySqlContainer createMySqlContainer(String configPath) {
+ return (MySqlContainer) new
MySqlContainer(MySqlContainer.MYSQL_VERSION)
+ .withConfigurationOverride(configPath)
+ .withSetupSQL("docker/setup.sql")
+ .withDatabaseName("flink-test")
+ .withUsername("flinkuser")
+ .withPassword("flinkpw")
+ .withLogConsumer(new Slf4jLogConsumer(LOG));
+ }
+}
diff --git
a/streampark-flink/streampark-flink-catalog-store/src/test/resources/docker/server/my.cnf
b/streampark-flink/streampark-flink-catalog-store/src/test/resources/docker/server/my.cnf
new file mode 100644
index 000000000..953ffe8f9
--- /dev/null
+++
b/streampark-flink/streampark-flink-catalog-store/src/test/resources/docker/server/my.cnf
@@ -0,0 +1,64 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# For advice on how to change settings please see
+# http://dev.mysql.com/doc/refman/5.7/en/server-configuration-defaults.html
+
+[mysqld]
+#
+# Remove leading # and set to the amount of RAM for the most important data
+# cache in MySQL. Start at 70% of total RAM for dedicated server, else 10%.
+# innodb_buffer_pool_size = 128M
+#
+# Remove leading # to turn on a very important data integrity option: logging
+# changes to the binary log between backups.
+# log_bin
+#
+# Remove leading # to set options mainly useful for reporting servers.
+# The server defaults are faster for transactions and fast SELECTs.
+# Adjust sizes as needed, experiment to find the optimal values.
+# join_buffer_size = 128M
+# sort_buffer_size = 2M
+# read_rnd_buffer_size = 2M
+skip-host-cache
+skip-name-resolve
+#datadir=/var/lib/mysql
+#socket=/var/lib/mysql/mysql.sock
+secure-file-priv=/var/lib/mysql
+user=mysql
+
+# Disabling symbolic-links is recommended to prevent assorted security risks
+symbolic-links=0
+
+#log-error=/var/log/mysqld.log
+#pid-file=/var/run/mysqld/mysqld.pid
+
+# ----------------------------------------------
+# Enable the binlog for replication & CDC
+# ----------------------------------------------
+
+# Enable binary replication log and set the prefix, expiration, and log format.
+# The prefix is arbitrary, expiration can be short for integration tests but
would
+# be longer on a production system. Row-level info is required for ingest to
work.
+# Server ID is required, but this will vary on production systems.
+server-id = 223344
+log_bin = mysql-bin
+binlog_format = row
+# Make binlog_expire_logs_seconds = 1 and max_binlog_size = 4096 to test the
exception
+# message when the binlog expires in the server.
+binlog_expire_logs_seconds = 1
+max_binlog_size = 4096
+
+# enable gtid mode
+gtid_mode = on
+enforce_gtid_consistency = on
\ No newline at end of file
diff --git
a/streampark-flink/streampark-flink-catalog-store/src/test/resources/docker/setup.sql
b/streampark-flink/streampark-flink-catalog-store/src/test/resources/docker/setup.sql
new file mode 100644
index 000000000..1b22107de
--- /dev/null
+++
b/streampark-flink/streampark-flink-catalog-store/src/test/resources/docker/setup.sql
@@ -0,0 +1,41 @@
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+-- In production you would almost certainly limit the replication user must be
on the follower (slave) machine,
+-- to prevent other clients accessing the log from other machines. For
example, 'replicator'@'follower.acme.com'.
+-- However, in this database we'll grant 2 users different privileges:
+--
+-- 1) 'flinkuser' - all privileges required by the snapshot reader AND binlog
reader (used for testing)
+-- 2) 'mysqluser' - all privileges
+--
+GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT,
LOCK TABLES ON *.* TO 'flinkuser'@'%';
+CREATE USER 'mysqluser' IDENTIFIED BY 'mysqlpw';
+GRANT ALL PRIVILEGES ON *.* TO 'mysqluser'@'%';
+
+--
----------------------------------------------------------------------------------------------------------------
+-- DATABASE: emptydb
+--
----------------------------------------------------------------------------------------------------------------
+CREATE DATABASE emptydb;
+USE flink-test;
+create table t_mysql_catalog (
+ `id` bigint NOT NULL AUTO_INCREMENT,
+ `catalog_name` varchar(255) NOT NULL,
+ `configuration` text,
+ `create_time` datetime DEFAULT NULL,
+ `update_time` datetime DEFAULT NULL,
+ PRIMARY KEY (`id`) USING BTREE,
+ UNIQUE INDEX `uniq_catalog_name` (`catalog_name`) USING BTREE
+)
+
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/pom.xml
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/pom.xml
index db8ff46cf..d13f26c1b 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/pom.xml
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/pom.xml
@@ -29,7 +29,7 @@
<name>StreamPark : Flink Shims 1.18</name>
<properties>
- <flink.version>1.18.0</flink.version>
+ <flink.version>1.18.1</flink.version>
</properties>
<dependencies>