This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 180bfb23d [feature] Introduce flink jdbc catalog store plugin. (#3914)
180bfb23d is described below

commit 180bfb23dac5b48e3dbc007918f5a0028cd3332c
Author: ouyangwulin <[email protected]>
AuthorDate: Wed Jul 31 20:07:00 2024 +0800

    [feature] Introduce flink jdbc catalog store plugin. (#3914)
    
    * [catalog] Introduce flink jdbc catalog store plugin.
    
    * fixed delete where unmatched
    
    * improve jdbc close method.
    
    * [catalog] refactor project module to stream-flink.
    
    * [catalog] refactor package name
    
    * [catalog] delete unused logs
---
 .idea/vcs.xml                                      |  28 ++--
 streampark-flink/pom.xml                           |   6 +-
 .../pom.xml                                        |  98 +++++-------
 .../apache/streampark/catalog/JacksonUtils.java    |  54 +++++++
 .../streampark/catalog/JdbcCatalogStore.java       | 175 +++++++++++++++++++++
 .../catalog/JdbcCatalogStoreFactory.java           | 101 ++++++++++++
 .../catalog/JdbcCatalogStoreFactoryOptions.java    |  71 +++++++++
 .../catalog/connections/JdbcConnectionOptions.java | 153 ++++++++++++++++++
 .../connections/JdbcConnectionProvider.java        |  85 ++++++++++
 .../connections/SimpleJdbcConnectionProvider.java  | 150 ++++++++++++++++++
 .../org.apache.flink.table.factories.Factory       |  16 ++
 .../catalog/JdbcCatalogStoreFactoryTest.java       |  56 +++++++
 .../streampark/catalog/JdbcCatalogStoreTest.java   | 137 ++++++++++++++++
 .../streampark/catalog/mysql/MySqlContainer.java   | 175 +++++++++++++++++++++
 .../streampark/catalog/mysql/MysqlBaseITCASE.java  |  57 +++++++
 .../src/test/resources/docker/server/my.cnf        |  64 ++++++++
 .../src/test/resources/docker/setup.sql            |  41 +++++
 .../streampark-flink-shims_flink-1.18/pom.xml      |   2 +-
 18 files changed, 1396 insertions(+), 73 deletions(-)

diff --git a/.idea/vcs.xml b/.idea/vcs.xml
index eef8c2d27..48b645033 100644
--- a/.idea/vcs.xml
+++ b/.idea/vcs.xml
@@ -16,17 +16,17 @@
   ~ limitations under the License.
   -->
 <project version="4">
-    <component name="VcsDirectoryMappings">
-        <mapping directory="$PROJECT_DIR$" vcs="Git" />
-    </component>
-    <component name="IssueNavigationConfiguration">
-        <option name="links">
-            <list>
-                <IssueNavigationLink>
-                    <option name="issueRegexp" value="#(\d+)" />
-                    <option name="linkRegexp" 
value="https://github.com/apache/incubator-streampark/pull/$1"; />
-                </IssueNavigationLink>
-            </list>
-        </option>
-    </component>
-</project>
+  <component name="VcsDirectoryMappings">
+    <mapping directory="$PROJECT_DIR$" vcs="Git" />
+  </component>
+  <component name="IssueNavigationConfiguration">
+    <option name="links">
+      <list>
+        <IssueNavigationLink>
+          <option name="issueRegexp" value="#(\d+)" />
+          <option name="linkRegexp" 
value="https://github.com/apache/incubator-streampark/pull/$1"; />
+        </IssueNavigationLink>
+      </list>
+    </option>
+  </component>
+</project>
\ No newline at end of file
diff --git a/streampark-flink/pom.xml b/streampark-flink/pom.xml
index c3befc8cf..ea053a2e5 100644
--- a/streampark-flink/pom.xml
+++ b/streampark-flink/pom.xml
@@ -38,6 +38,7 @@
         <module>streampark-flink-packer</module>
         <module>streampark-flink-kubernetes</module>
         <module>streampark-flink-sql-gateway</module>
+        <module>streampark-flink-catalog-store</module>
     </modules>
 
     <dependencies>
@@ -62,7 +63,10 @@
             <version>${jupiter.version}</version>
             <scope>test</scope>
         </dependency>
-
+        <dependency>
+            <groupId>org.assertj</groupId>
+            <artifactId>assertj-core</artifactId>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/pom.xml
 b/streampark-flink/streampark-flink-catalog-store/pom.xml
similarity index 63%
copy from 
streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/pom.xml
copy to streampark-flink/streampark-flink-catalog-store/pom.xml
index db8ff46cf..78223ed55 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/pom.xml
+++ b/streampark-flink/streampark-flink-catalog-store/pom.xml
@@ -16,103 +16,87 @@
   ~ limitations under the License.
   -->
 <project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
     <modelVersion>4.0.0</modelVersion>
-
     <parent>
         <groupId>org.apache.streampark</groupId>
-        <artifactId>streampark-flink-shims</artifactId>
+        <artifactId>streampark-flink</artifactId>
         <version>2.2.0-SNAPSHOT</version>
     </parent>
 
-    
<artifactId>streampark-flink-shims_flink-1.18_${scala.binary.version}</artifactId>
-    <name>StreamPark : Flink Shims 1.18</name>
+    <artifactId>streampark-flink-catalog-store</artifactId>
+    <name>StreamPark : Flink Catalog Store</name>
 
     <properties>
-        <flink.version>1.18.0</flink.version>
+        <flink.shaded.version19>19.0</flink.shaded.version19>
+        <flink.version>1.18.1</flink.version>
+        <flink.shaded.jackson.version>2.15.3</flink.shaded.jackson.version>
     </properties>
 
     <dependencies>
-        <dependency>
-            <groupId>org.apache.streampark</groupId>
-            
<artifactId>streampark-flink-shims-base_${scala.binary.version}</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
-            <version>${flink.version}</version>
-            <scope>provided</scope>
-        </dependency>
-
-        <!--flink-->
         <dependency>
             <groupId>org.apache.flink</groupId>
-            
<artifactId>flink-table-api-scala_${scala.binary.version}</artifactId>
+            <artifactId>flink-annotations</artifactId>
             <version>${flink.version}</version>
             <scope>provided</scope>
         </dependency>
-
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-scala_${scala.binary.version}</artifactId>
+            <artifactId>flink-table-common</artifactId>
             <version>${flink.version}</version>
             <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.flink</groupId>
+                    <artifactId>flink-shaded-jackson</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
-
         <dependency>
             <groupId>org.apache.flink</groupId>
-            
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
+            <artifactId>flink-table-api-java</artifactId>
             <version>${flink.version}</version>
             <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.flink</groupId>
+                    <artifactId>flink-shaded-jackson</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
-
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-table-api-java-uber</artifactId>
             <version>${flink.version}</version>
             <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.flink</groupId>
+                    <artifactId>flink-shaded-jackson</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
-            <version>${flink.version}</version>
-            <optional>true</optional>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-statebackend-rocksdb</artifactId>
-            <version>${flink.version}</version>
-            <scope>provided</scope>
-        </dependency>
-
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-yarn</artifactId>
-            <version>${flink.version}</version>
+            <artifactId>flink-shaded-jackson</artifactId>
+            
<version>${flink.shaded.jackson.version}-${flink.shaded.version19}</version>
             <scope>provided</scope>
         </dependency>
-
         <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-client-api</artifactId>
-            <optional>true</optional>
+            <groupId>mysql</groupId>
+            <artifactId>mysql-connector-java</artifactId>
+            <scope>test</scope>
         </dependency>
-
         <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-client-runtime</artifactId>
-            <optional>true</optional>
+            <groupId>org.assertj</groupId>
+            <artifactId>assertj-core</artifactId>
+            <scope>test</scope>
         </dependency>
-
         <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-kubernetes</artifactId>
-            <version>${flink.version}</version>
-            <scope>provided</scope>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>mysql</artifactId>
+            <version>1.19.0</version>
+            <scope>test</scope>
         </dependency>
     </dependencies>
 
@@ -132,7 +116,7 @@
                             
<dependencyReducedPomLocation>${project.basedir}/target/dependency-reduced-pom.xml</dependencyReducedPomLocation>
                             <artifactSet>
                                 <includes>
-                                    
<include>org.apache.flink:flink-table-api-scala-bridge_${scala.binary.version}</include>
+                                    <include>org.apache.streampark:*</include>
                                 </includes>
                             </artifactSet>
                             <filters>
diff --git 
a/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/JacksonUtils.java
 
b/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/JacksonUtils.java
new file mode 100644
index 000000000..11cb6906e
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/JacksonUtils.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.catalog;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializationFeature;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.module.SimpleModule;
+
+/** Serialization utils */
+public final class JacksonUtils {
+
+    private JacksonUtils() {
+    }
+
+    private static final ObjectMapper MAPPER;
+
+    static {
+        MAPPER = new ObjectMapper();
+        MAPPER.registerModule(new SimpleModule());
+        MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
+        MAPPER.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
+        MAPPER.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, 
false);
+    }
+
+    public static <T> T read(String json, Class<T> clazz) throws 
JsonProcessingException {
+        return MAPPER.readValue(json, clazz);
+    }
+
+    public static <T> T read(String json, TypeReference<T> typeReference) 
throws JsonProcessingException {
+        return MAPPER.readValue(json, typeReference);
+    }
+
+    public static String write(Object object) throws JsonProcessingException {
+        return MAPPER.writeValueAsString(object);
+    }
+}
diff --git 
a/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/JdbcCatalogStore.java
 
b/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/JdbcCatalogStore.java
new file mode 100644
index 000000000..feeb76841
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/JdbcCatalogStore.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.catalog;
+
+import org.apache.streampark.catalog.connections.JdbcConnectionProvider;
+
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.table.catalog.AbstractCatalogStore;
+import org.apache.flink.table.catalog.CatalogDescriptor;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ *  Catalog Store for Jdbc.
+ */
+public class JdbcCatalogStore extends AbstractCatalogStore {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(JdbcCatalogStore.class);
+    private final JdbcConnectionProvider jdbcConnectionProvider;
+    private transient Connection connection;
+    private transient Statement statement;
+    private transient ResultSet resultSet;
+
+    private final String catalogTableName;
+
+    public JdbcCatalogStore(JdbcConnectionProvider jdbcConnectionProvider, 
String catalogTableName) {
+        this.jdbcConnectionProvider = jdbcConnectionProvider;
+        this.catalogTableName = catalogTableName;
+    }
+
+    @Override
+    public void open() {
+        try {
+            this.connection = 
jdbcConnectionProvider.getOrEstablishConnection();
+            this.statement = this.connection.createStatement();
+            super.open();
+        } catch (SQLException | ClassNotFoundException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void close() {
+        closeResultSetAndStatement();
+        try {
+            jdbcConnectionProvider.close();
+        } catch (Exception e) {
+            throw new CatalogException(e);
+        }
+        super.close();
+    }
+
+    @Override
+    public void storeCatalog(String catalogName, CatalogDescriptor 
catalogDescriptor) throws CatalogException {
+        checkOpenState();
+        try {
+            if (!contains(catalogName)) {
+                statement.executeUpdate(String.format(
+                    "insert into %s 
(catalog_name,configuration,create_time,update_time) values 
('%s','%s',now(),now())",
+                    this.catalogTableName, catalogName,
+                    
JacksonUtils.write(catalogDescriptor.getConfiguration().toMap())));
+            } else {
+                LOG.error("catalog {} is exist.", catalogName);
+            }
+        } catch (SQLException | JsonProcessingException e) {
+            throw new CatalogException(String.format("Store catalog %s 
failed!", catalogName), e);
+        }
+    }
+
+    @Override
+    public void removeCatalog(String catalogName, boolean ignoreIfNotExists) 
throws CatalogException {
+        checkOpenState();
+        try {
+            int effectRow = statement.executeUpdate(
+                String.format("delete from %s where catalog_name='%s'", 
this.catalogTableName, catalogName));
+
+            if (effectRow == 0 && !ignoreIfNotExists) {
+                throw new CatalogException(String.format("Remove catalog %s 
failed!", catalogName));
+            }
+        } catch (SQLException e) {
+            LOG.error("Remove catalog {} failed!", catalogName, e);
+            throw new CatalogException(String.format("Remove catalog %s 
failed!", catalogName));
+        }
+    }
+
+    @Override
+    public Optional<CatalogDescriptor> getCatalog(String catalogName) throws 
CatalogException {
+        checkOpenState();
+        try {
+            resultSet = statement
+                .executeQuery(
+                    String.format("select * from %s where catalog_name='%s'", 
this.catalogTableName, catalogName));
+            while (resultSet.next()) {
+                return Optional.of(CatalogDescriptor.of(catalogName,
+                    
Configuration.fromMap(JacksonUtils.read(resultSet.getString("configuration"), 
Map.class))));
+            }
+        } catch (SQLException | JsonProcessingException e) {
+            throw new CatalogException(String.format("Get catalog %s failed!", 
catalogName), e);
+        }
+        return Optional.empty();
+    }
+
+    @Override
+    public Set<String> listCatalogs() throws CatalogException {
+        checkOpenState();
+        Set<String> catalogs = new HashSet<>();
+        try {
+            resultSet = statement.executeQuery(String.format("select * from 
%s;", this.catalogTableName));
+            while (resultSet.next()) {
+                catalogs.add(resultSet.getString("catalog_name"));
+            }
+            return catalogs;
+        } catch (SQLException e) {
+            throw new CatalogException("List catalogs failed!", e);
+        }
+    }
+
+    @Override
+    public boolean contains(String catalogName) throws CatalogException {
+        checkOpenState();
+        try {
+            resultSet = statement.executeQuery(
+                String.format("select * from %s where catalog_name='%s';", 
this.catalogTableName, catalogName));
+            while (resultSet.next()) {
+                resultSet.getString("catalog_name");
+                return true;
+            }
+        } catch (SQLException e) {
+            throw new CatalogException(String.format("Catalog %s is contains 
failed!", catalogName), e);
+        }
+        return false;
+    }
+
+    private void closeResultSetAndStatement() {
+        try {
+            if (resultSet != null && !resultSet.isClosed()) {
+                resultSet.close();
+            }
+            if (statement != null && !statement.isClosed()) {
+                statement.close();
+            }
+            resultSet = null;
+            statement = null;
+        } catch (SQLException e) {
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git 
a/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/JdbcCatalogStoreFactory.java
 
b/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/JdbcCatalogStoreFactory.java
new file mode 100644
index 000000000..38aa6e5c7
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/JdbcCatalogStoreFactory.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.catalog;
+
+import org.apache.streampark.catalog.connections.JdbcConnectionOptions;
+import org.apache.streampark.catalog.connections.JdbcConnectionProvider;
+import org.apache.streampark.catalog.connections.SimpleJdbcConnectionProvider;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.catalog.CatalogStore;
+import org.apache.flink.table.factories.CatalogStoreFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import static 
org.apache.flink.table.factories.FactoryUtil.createCatalogStoreFactoryHelper;
+import static 
org.apache.streampark.catalog.JdbcCatalogStoreFactoryOptions.DRIVER;
+import static 
org.apache.streampark.catalog.JdbcCatalogStoreFactoryOptions.IDENTIFIER;
+import static 
org.apache.streampark.catalog.JdbcCatalogStoreFactoryOptions.MAX_RETRY_TIMEOUT;
+import static 
org.apache.streampark.catalog.JdbcCatalogStoreFactoryOptions.PASSWORD;
+import static 
org.apache.streampark.catalog.JdbcCatalogStoreFactoryOptions.TABLE_NAME;
+import static org.apache.streampark.catalog.JdbcCatalogStoreFactoryOptions.URL;
+import static 
org.apache.streampark.catalog.JdbcCatalogStoreFactoryOptions.USERNAME;
+
+/** Catalog Store Factory for Jdbc. */
+public class JdbcCatalogStoreFactory implements CatalogStoreFactory {
+
+    private JdbcConnectionProvider jdbcConnectionProvider;
+    private transient String catalogTableName;
+
+    @Override
+    public CatalogStore createCatalogStore() {
+        return new JdbcCatalogStore(jdbcConnectionProvider, 
this.catalogTableName);
+    }
+
+    @Override
+    public void open(Context context) {
+        FactoryUtil.FactoryHelper<CatalogStoreFactory> factoryHelper =
+            createCatalogStoreFactoryHelper(this, context);
+        factoryHelper.validate();
+
+        ReadableConfig options = factoryHelper.getOptions();
+        JdbcConnectionOptions jdbcConnectionOptions =
+            new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
+                .withUrl(options.get(URL))
+                .withDriverName(options.get(DRIVER))
+                .withUsername(options.get(USERNAME))
+                .withPassword(options.get(PASSWORD))
+                
.withConnectionCheckTimeoutSeconds(options.get(MAX_RETRY_TIMEOUT))
+                .build();
+
+        this.catalogTableName = options.get(TABLE_NAME);
+        this.jdbcConnectionProvider = new 
SimpleJdbcConnectionProvider(jdbcConnectionOptions);
+    }
+
+    @Override
+    public void close() {
+        this.jdbcConnectionProvider.closeConnection();
+    }
+
+    @Override
+    public String factoryIdentifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(URL);
+        options.add(DRIVER);
+        options.add(USERNAME);
+        options.add(PASSWORD);
+        options.add(TABLE_NAME);
+        return Collections.unmodifiableSet(options);
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(MAX_RETRY_TIMEOUT);
+        return Collections.unmodifiableSet(options);
+    }
+}
diff --git 
a/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/JdbcCatalogStoreFactoryOptions.java
 
b/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/JdbcCatalogStoreFactoryOptions.java
new file mode 100644
index 000000000..85a324bb5
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/JdbcCatalogStoreFactoryOptions.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.catalog;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+/**
+ *  Catalog Store Options for Jdbc.
+ */
+public class JdbcCatalogStoreFactoryOptions {
+
+    public static final String IDENTIFIER = "jdbc";
+
+    public static final ConfigOption<String> URL =
+        ConfigOptions.key("url")
+            .stringType()
+            .noDefaultValue()
+            .withDescription(
+                "The JDBC database url.");
+    public static final ConfigOption<String> TABLE_NAME =
+        ConfigOptions.key("table-name")
+            .stringType()
+            .noDefaultValue()
+            .withDescription(
+                "The name of JDBC table to connect.");
+    public static final ConfigOption<String> DRIVER =
+        ConfigOptions.key("driver")
+            .stringType()
+            .noDefaultValue()
+            .withDescription(
+                "The class name of the JDBC driver to use to connect to this 
URL, if not set, it will automatically be derived from the URL.");
+    public static final ConfigOption<String> USERNAME =
+        ConfigOptions.key("username")
+            .stringType()
+            .noDefaultValue()
+            .withDescription(
+                "The JDBC user name. 'username' and 'password' must both be 
specified if any of them is specified.");
+
+    public static final ConfigOption<String> PASSWORD =
+        ConfigOptions.key("password")
+            .stringType()
+            .noDefaultValue()
+            .withDescription(
+                "The JDBC password.");
+
+    public static final ConfigOption<Integer> MAX_RETRY_TIMEOUT =
+        ConfigOptions.key("max-retry-timeout")
+            .intType()
+            .defaultValue(60)
+            .withDescription(
+                "Maximum timeout between retries. The timeout should be in 
second granularity and shouldn't be smaller than 1 second.");
+
+    private JdbcCatalogStoreFactoryOptions() {
+    }
+}
diff --git 
a/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/connections/JdbcConnectionOptions.java
 
b/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/connections/JdbcConnectionOptions.java
new file mode 100644
index 000000000..62f460d4e
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/connections/JdbcConnectionOptions.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.catalog.connections;
+
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+
+public class JdbcConnectionOptions implements Serializable {
+
+    public static final String USER_KEY = "user";
+    public static final String PASSWORD_KEY = "password";
+
+    private static final long serialVersionUID = 1L;
+
+    protected final String url;
+    @Nullable
+    protected final String driverName;
+    protected final int connectionCheckTimeoutSeconds;
+    @Nonnull
+    protected final Properties properties;
+
+    protected JdbcConnectionOptions(
+                                    String url,
+                                    @Nullable String driverName,
+                                    int connectionCheckTimeoutSeconds,
+                                    @Nonnull Properties properties) {
+        Preconditions.checkArgument(
+            connectionCheckTimeoutSeconds > 0,
+            "Connection check timeout seconds shouldn't be smaller than 1");
+        this.url = Preconditions.checkNotNull(url, "jdbc url is empty");
+        this.driverName = driverName;
+        this.connectionCheckTimeoutSeconds = connectionCheckTimeoutSeconds;
+        this.properties =
+            Preconditions.checkNotNull(properties, "Connection properties must 
be non-null");
+    }
+
+    public String getDbURL() {
+        return url;
+    }
+
+    @Nullable
+    public String getDriverName() {
+        return driverName;
+    }
+
+    public Optional<String> getUsername() {
+        return Optional.ofNullable(properties.getProperty(USER_KEY));
+    }
+
+    public Optional<String> getPassword() {
+        return Optional.ofNullable(properties.getProperty(PASSWORD_KEY));
+    }
+
+    public int getConnectionCheckTimeoutSeconds() {
+        return connectionCheckTimeoutSeconds;
+    }
+
+    @Nonnull
+    public Properties getProperties() {
+        return properties;
+    }
+
+    @Nonnull
+    public static Properties getBriefAuthProperties(String user, String 
password) {
+        final Properties result = new Properties();
+        if (Objects.nonNull(user)) {
+            result.put(USER_KEY, user);
+        }
+        if (Objects.nonNull(password)) {
+            result.put(PASSWORD_KEY, password);
+        }
+        return result;
+    }
+
+    /** Builder for {@link JdbcConnectionOptions}. */
+    public static class JdbcConnectionOptionsBuilder {
+
+        private String url;
+        private String driverName;
+        private int connectionCheckTimeoutSeconds = 60;
+        private final Properties properties = new Properties();
+
+        public JdbcConnectionOptionsBuilder withUrl(String url) {
+            this.url = url;
+            return this;
+        }
+
+        public JdbcConnectionOptionsBuilder withDriverName(String driverName) {
+            this.driverName = driverName;
+            return this;
+        }
+
+        public JdbcConnectionOptionsBuilder withProperty(String propKey, 
String propVal) {
+            Preconditions.checkNotNull(propKey, "Connection property key 
mustn't be null");
+            Preconditions.checkNotNull(propVal, "Connection property value 
mustn't be null");
+            this.properties.put(propKey, propVal);
+            return this;
+        }
+
+        public JdbcConnectionOptionsBuilder withUsername(String username) {
+            if (Objects.nonNull(username)) {
+                this.properties.put(USER_KEY, username);
+            }
+            return this;
+        }
+
+        public JdbcConnectionOptionsBuilder withPassword(String password) {
+            if (Objects.nonNull(password)) {
+                this.properties.put(PASSWORD_KEY, password);
+            }
+            return this;
+        }
+
+        /**
+         * Set the maximum timeout between retries, default is 60 seconds.
+         *
+         * @param connectionCheckTimeoutSeconds the timeout seconds, shouldn't 
smaller than 1
+         *     second.
+         */
+        public JdbcConnectionOptionsBuilder withConnectionCheckTimeoutSeconds(
+                                                                              
int connectionCheckTimeoutSeconds) {
+            this.connectionCheckTimeoutSeconds = connectionCheckTimeoutSeconds;
+            return this;
+        }
+
+        public JdbcConnectionOptions build() {
+            return new JdbcConnectionOptions(
+                url, driverName, connectionCheckTimeoutSeconds, properties);
+        }
+    }
+}
diff --git 
a/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/connections/JdbcConnectionProvider.java
 
b/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/connections/JdbcConnectionProvider.java
new file mode 100644
index 000000000..c49cb1fc7
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/connections/JdbcConnectionProvider.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.catalog.connections;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Properties;
+
+/** JDBC connection provider. */
+@PublicEvolving
+public interface JdbcConnectionProvider extends Serializable, AutoCloseable {
+
+    /**
+     * Get existing connection.
+     *
+     * @return existing connection
+     */
+    @Nullable
+    Connection getConnection();
+
+    /**
+     * Get existing connection properties.
+     *
+     * @return existing connection properties
+     */
+    @Nonnull
+    default Properties getProperties() {
+        throw new UnsupportedOperationException();
+    }
+
+    /**
+     * Check whether possible existing connection is valid or not through 
{@link
+     * Connection#isValid(int)}.
+     *
+     * @return true if existing connection is valid
+     * @throws SQLException sql exception throw from {@link 
Connection#isValid(int)}
+     */
+    boolean isConnectionValid() throws SQLException;
+
+    /**
+     * Get existing connection or establish an new one if there is none.
+     *
+     * @return existing connection or newly established connection
+     * @throws SQLException sql exception
+     * @throws ClassNotFoundException driver class not found
+     */
+    Connection getOrEstablishConnection() throws SQLException, 
ClassNotFoundException;
+
+    /** Close possible existing connection. */
+    void closeConnection();
+
+    /**
+     * Close possible existing connection and establish an new one.
+     *
+     * @return newly established connection
+     * @throws SQLException sql exception
+     * @throws ClassNotFoundException driver class not found
+     */
+    Connection reestablishConnection() throws SQLException, 
ClassNotFoundException;
+
+    default void close() throws Exception {
+        closeConnection();
+    }
+}
diff --git 
a/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/connections/SimpleJdbcConnectionProvider.java
 
b/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/connections/SimpleJdbcConnectionProvider.java
new file mode 100644
index 000000000..e78c0bfbe
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-catalog-store/src/main/java/org/apache/streampark/catalog/connections/SimpleJdbcConnectionProvider.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.catalog.connections;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.Serializable;
+import java.sql.Connection;
+import java.sql.Driver;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Enumeration;
+import java.util.Properties;
+
+/** Simple JDBC connection provider. */
+@NotThreadSafe
+@PublicEvolving
+public class SimpleJdbcConnectionProvider implements JdbcConnectionProvider, 
Serializable {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(SimpleJdbcConnectionProvider.class);
+
+    private static final long serialVersionUID = 1L;
+
+    private final JdbcConnectionOptions jdbcOptions;
+
+    private transient Driver loadedDriver;
+    private transient Connection connection;
+
+    static {
+        // Load DriverManager first to avoid deadlock between DriverManager's
+        // static initialization block and specific driver class's static
+        // initialization block when two different driver classes are loading
+        // concurrently using Class.forName while DriverManager is 
uninitialized
+        // before.
+        //
+        // This could happen in JDK 8 but not above as driver loading has been
+        // moved out of DriverManager's static initialization block since JDK 
9.
+        DriverManager.getDrivers();
+    }
+
+    public SimpleJdbcConnectionProvider(JdbcConnectionOptions jdbcOptions) {
+        this.jdbcOptions = jdbcOptions;
+    }
+
+    @Override
+    public Connection getConnection() {
+        return connection;
+    }
+
+    @Nonnull
+    @Override
+    public Properties getProperties() {
+        return jdbcOptions.getProperties();
+    }
+
+    @Override
+    public boolean isConnectionValid() throws SQLException {
+        return connection != null
+            && 
connection.isValid(jdbcOptions.getConnectionCheckTimeoutSeconds());
+    }
+
+    private Driver loadDriver(String driverName) throws SQLException, 
ClassNotFoundException {
+        Preconditions.checkNotNull(driverName);
+        Enumeration<Driver> drivers = DriverManager.getDrivers();
+        while (drivers.hasMoreElements()) {
+            Driver driver = drivers.nextElement();
+            if (driver.getClass().getName().equals(driverName)) {
+                return driver;
+            }
+        }
+        // We could reach here for reasons:
+        // * Class loader hell of DriverManager(see JDK-8146872).
+        // * driver is not installed as a service provider.
+        Class<?> clazz =
+            Class.forName(driverName, true, 
Thread.currentThread().getContextClassLoader());
+        try {
+            return (Driver) clazz.newInstance();
+        } catch (Exception ex) {
+            throw new SQLException("Fail to create driver of class " + 
driverName, ex);
+        }
+    }
+
+    private Driver getLoadedDriver() throws SQLException, 
ClassNotFoundException {
+        if (loadedDriver == null) {
+            loadedDriver = loadDriver(jdbcOptions.getDriverName());
+        }
+        return loadedDriver;
+    }
+
+    @Override
+    public Connection getOrEstablishConnection() throws SQLException, 
ClassNotFoundException {
+        if (isConnectionValid()) {
+            return connection;
+        }
+        if (jdbcOptions.getDriverName() == null) {
+            connection = DriverManager.getConnection(jdbcOptions.getDbURL(), 
getProperties());
+        } else {
+            Driver driver = getLoadedDriver();
+            connection = driver.connect(jdbcOptions.getDbURL(), 
getProperties());
+            if (connection == null) {
+                // Throw same exception as DriverManager.getConnection when no 
driver found to match
+                // caller expectation.
+                throw new SQLException(
+                    "No suitable driver found for " + jdbcOptions.getDbURL(), 
"08001");
+            }
+        }
+        return connection;
+    }
+
+    @Override
+    public void closeConnection() {
+        if (connection != null) {
+            try {
+                connection.close();
+            } catch (SQLException e) {
+                LOG.warn("JDBC connection close failed.", e);
+            } finally {
+                connection = null;
+            }
+        }
+    }
+
+    @Override
+    public Connection reestablishConnection() throws SQLException, 
ClassNotFoundException {
+        closeConnection();
+        return getOrEstablishConnection();
+    }
+}
diff --git 
a/streampark-flink/streampark-flink-catalog-store/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
 
b/streampark-flink/streampark-flink-catalog-store/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 000000000..9bcdf4448
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-catalog-store/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.streampark.catalog.JdbcCatalogStoreFactory
diff --git 
a/streampark-flink/streampark-flink-catalog-store/src/test/java/org/apache/streampark/catalog/JdbcCatalogStoreFactoryTest.java
 
b/streampark-flink/streampark-flink-catalog-store/src/test/java/org/apache/streampark/catalog/JdbcCatalogStoreFactoryTest.java
new file mode 100644
index 000000000..59842e1b4
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-catalog-store/src/test/java/org/apache/streampark/catalog/JdbcCatalogStoreFactoryTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.catalog;
+
+import org.apache.streampark.catalog.mysql.MysqlBaseITCASE;
+
+import org.apache.flink.table.catalog.CatalogStore;
+import org.apache.flink.table.factories.CatalogStoreFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+public class JdbcCatalogStoreFactoryTest extends MysqlBaseITCASE {
+
+    @org.junit.Test
+    public void testCatalogStoreFactoryDiscovery() {
+
+        String factoryIdentifier = JdbcCatalogStoreFactoryOptions.IDENTIFIER;
+        Map<String, String> options = new HashMap<>();
+        options.put("url", MYSQL_CONTAINER.getJdbcUrl());
+        options.put("table-name", "t_mysql_catalog");
+        options.put("driver", MYSQL_CONTAINER.getDriverClassName());
+        options.put("username", MYSQL_CONTAINER.getUsername());
+        options.put("password", MYSQL_CONTAINER.getPassword());
+        ClassLoader classLoader = 
Thread.currentThread().getContextClassLoader();
+        final FactoryUtil.DefaultCatalogStoreContext discoveryContext =
+            new FactoryUtil.DefaultCatalogStoreContext(options, null, 
classLoader);
+        final CatalogStoreFactory factory =
+            FactoryUtil.discoverFactory(
+                classLoader, CatalogStoreFactory.class, factoryIdentifier);
+        factory.open(discoveryContext);
+
+        CatalogStore catalogStore = factory.createCatalogStore();
+        assertThat(catalogStore instanceof JdbcCatalogStore).isTrue();
+
+        factory.close();
+    }
+}
diff --git 
a/streampark-flink/streampark-flink-catalog-store/src/test/java/org/apache/streampark/catalog/JdbcCatalogStoreTest.java
 
b/streampark-flink/streampark-flink-catalog-store/src/test/java/org/apache/streampark/catalog/JdbcCatalogStoreTest.java
new file mode 100644
index 000000000..626768863
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-catalog-store/src/test/java/org/apache/streampark/catalog/JdbcCatalogStoreTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.catalog;
+
+import org.apache.streampark.catalog.connections.JdbcConnectionOptions;
+import org.apache.streampark.catalog.connections.JdbcConnectionProvider;
+import org.apache.streampark.catalog.connections.SimpleJdbcConnectionProvider;
+import org.apache.streampark.catalog.mysql.MysqlBaseITCASE;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.catalog.CatalogDescriptor;
+import org.apache.flink.table.catalog.CatalogStore;
+import org.apache.flink.table.catalog.CommonCatalogOptions;
+import org.apache.flink.table.catalog.GenericInMemoryCatalogFactoryOptions;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+
+import org.assertj.core.api.ThrowableAssert;
+
+import java.util.Set;
+
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
+
+public class JdbcCatalogStoreTest extends MysqlBaseITCASE {
+
+    private static final String DUMMY = "dummy";
+    private static final CatalogDescriptor DUMMY_CATALOG;
+
+    static {
+        Configuration conf = new Configuration();
+        conf.set(CommonCatalogOptions.CATALOG_TYPE, DUMMY);
+        conf.set(GenericInMemoryCatalogFactoryOptions.DEFAULT_DATABASE, 
"dummy_db");
+
+        DUMMY_CATALOG = CatalogDescriptor.of(DUMMY, conf);
+    }
+    @org.junit.Test
+    public void testNotOpened() {
+        CatalogStore catalogStore = initCatalogStore();
+
+        assertCatalogStoreNotOpened(catalogStore::listCatalogs);
+        assertCatalogStoreNotOpened(() -> catalogStore.contains(DUMMY));
+        assertCatalogStoreNotOpened(() -> catalogStore.getCatalog(DUMMY));
+        assertCatalogStoreNotOpened(() -> catalogStore.storeCatalog(DUMMY, 
DUMMY_CATALOG));
+        assertCatalogStoreNotOpened(() -> catalogStore.removeCatalog(DUMMY, 
true));
+    }
+
+    @org.junit.Test
+    public void testStore() {
+        CatalogStore catalogStore = initCatalogStore();
+        catalogStore.open();
+
+        catalogStore.storeCatalog(DUMMY, DUMMY_CATALOG);
+
+        Set<String> storedCatalogs = catalogStore.listCatalogs();
+        assertThat(storedCatalogs.size()).isEqualTo(1);
+        assertThat(storedCatalogs.contains(DUMMY)).isTrue();
+    }
+
+    @org.junit.Test
+    public void testRemoveExisting() {
+        CatalogStore catalogStore = initCatalogStore();
+        catalogStore.open();
+
+        catalogStore.storeCatalog(DUMMY, DUMMY_CATALOG);
+        assertThat(catalogStore.listCatalogs().size()).isEqualTo(1);
+
+        catalogStore.removeCatalog(DUMMY, false);
+        assertThat(catalogStore.listCatalogs().size()).isEqualTo(0);
+        assertThat(catalogStore.contains(DUMMY)).isFalse();
+    }
+
+    @org.junit.Test
+    public void testRemoveNonExisting() {
+        CatalogStore catalogStore = initCatalogStore();
+        catalogStore.open();
+
+        catalogStore.removeCatalog(DUMMY, true);
+
+        assertThatThrownBy(() -> catalogStore.removeCatalog(DUMMY, false))
+            .isInstanceOf(CatalogException.class)
+            .hasMessageContaining(
+                "Remove catalog " + DUMMY + " failed!");
+    }
+
+    @org.junit.Test
+    public void testClosed() {
+        CatalogStore catalogStore = initCatalogStore();
+
+        catalogStore.open();
+
+        catalogStore.storeCatalog(DUMMY, DUMMY_CATALOG);
+        assertThat(catalogStore.listCatalogs().size()).isEqualTo(1);
+
+        catalogStore.close();
+
+        assertCatalogStoreNotOpened(catalogStore::listCatalogs);
+        assertCatalogStoreNotOpened(() -> catalogStore.contains(DUMMY));
+        assertCatalogStoreNotOpened(() -> catalogStore.getCatalog(DUMMY));
+        assertCatalogStoreNotOpened(() -> catalogStore.storeCatalog(DUMMY, 
DUMMY_CATALOG));
+        assertCatalogStoreNotOpened(() -> catalogStore.removeCatalog(DUMMY, 
true));
+    }
+
+    private void assertCatalogStoreNotOpened(
+                                             ThrowableAssert.ThrowingCallable 
shouldRaiseThrowable) {
+        assertThatThrownBy(shouldRaiseThrowable)
+            .isInstanceOf(IllegalStateException.class)
+            .hasMessageContaining("CatalogStore is not opened yet.");
+    }
+
+    private JdbcCatalogStore initCatalogStore() {
+        JdbcConnectionOptions jdbcConnectionOptions = new 
JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
+            .withUrl(MYSQL_CONTAINER.getJdbcUrl())
+            .withDriverName(MYSQL_CONTAINER.getDriverClassName())
+            .withUsername(MYSQL_CONTAINER.getUsername())
+            .withPassword(MYSQL_CONTAINER.getPassword())
+            .build();
+
+        JdbcConnectionProvider jdbcConnectionProvider = new 
SimpleJdbcConnectionProvider(jdbcConnectionOptions);
+        return new JdbcCatalogStore(jdbcConnectionProvider, "t_mysql_catalog");
+
+    }
+}
diff --git 
a/streampark-flink/streampark-flink-catalog-store/src/test/java/org/apache/streampark/catalog/mysql/MySqlContainer.java
 
b/streampark-flink/streampark-flink-catalog-store/src/test/java/org/apache/streampark/catalog/mysql/MySqlContainer.java
new file mode 100644
index 000000000..94a5c2f7e
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-catalog-store/src/test/java/org/apache/streampark/catalog/mysql/MySqlContainer.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.catalog.mysql;
+
+import org.testcontainers.containers.ContainerLaunchException;
+import org.testcontainers.containers.JdbcDatabaseContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class MySqlContainer extends JdbcDatabaseContainer {
+
+    public static final String IMAGE = "mysql";
+
+    public static final String MYSQL_VERSION = "8.0";
+    public static final Integer MYSQL_PORT = 3306;
+
+    private static final String MY_CNF_CONFIG_OVERRIDE_PARAM_NAME = "MY_CNF";
+    private static final String SETUP_SQL_PARAM_NAME = "SETUP_SQL";
+    private static final String MYSQL_ROOT_USER = "root";
+
+    private String databaseName = "test";
+    private String username = "test";
+    private String password = "test";
+
+    public MySqlContainer() {
+        this(MYSQL_VERSION);
+    }
+
+    public MySqlContainer(String version) {
+        super(DockerImageName.parse(IMAGE + ":" + version));
+        addExposedPort(MYSQL_PORT);
+    }
+
+    @Override
+    protected Set<Integer> getLivenessCheckPorts() {
+        return new HashSet<>(getMappedPort(MYSQL_PORT));
+    }
+
+    @Override
+    protected void configure() {
+        optionallyMapResourceParameterAsVolume(
+            MY_CNF_CONFIG_OVERRIDE_PARAM_NAME, "/etc/mysql/", 
"mysql-default-conf");
+
+        if (parameters.containsKey(SETUP_SQL_PARAM_NAME)) {
+            optionallyMapResourceParameterAsVolume(
+                SETUP_SQL_PARAM_NAME, "/docker-entrypoint-initdb.d/", "N/A");
+        }
+
+        addEnv("MYSQL_DATABASE", databaseName);
+        addEnv("MYSQL_USER", username);
+        if (password != null && !password.isEmpty()) {
+            addEnv("MYSQL_PASSWORD", password);
+            addEnv("MYSQL_ROOT_PASSWORD", password);
+        } else if (MYSQL_ROOT_USER.equalsIgnoreCase(username)) {
+            addEnv("MYSQL_ALLOW_EMPTY_PASSWORD", "yes");
+        } else {
+            throw new ContainerLaunchException(
+                "Empty password can be used only with the root user");
+        }
+        setStartupAttempts(3);
+    }
+
+    @Override
+    public String getDriverClassName() {
+        try {
+            Class.forName("com.mysql.cj.jdbc.Driver");
+            return "com.mysql.cj.jdbc.Driver";
+        } catch (ClassNotFoundException e) {
+            return "com.mysql.jdbc.Driver";
+        }
+    }
+
+    public String getJdbcUrl(String databaseName) {
+        String additionalUrlParams = constructUrlParameters("?", "&");
+        return "jdbc:mysql://"
+            + getHost()
+            + ":"
+            + getDatabasePort()
+            + "/"
+            + databaseName
+            + additionalUrlParams;
+    }
+
+    @Override
+    public String getJdbcUrl() {
+        return getJdbcUrl(databaseName);
+    }
+
+    public int getDatabasePort() {
+        return getMappedPort(MYSQL_PORT);
+    }
+
+    @Override
+    protected String constructUrlForConnection(String queryString) {
+        String url = super.constructUrlForConnection(queryString);
+
+        if (!url.contains("useSSL=")) {
+            String separator = url.contains("?") ? "&" : "?";
+            url = url + separator + "useSSL=false";
+        }
+
+        if (!url.contains("allowPublicKeyRetrieval=")) {
+            url = url + "&allowPublicKeyRetrieval=true";
+        }
+
+        return url;
+    }
+
+    @Override
+    public String getDatabaseName() {
+        return databaseName;
+    }
+
+    @Override
+    public String getUsername() {
+        return username;
+    }
+
+    @Override
+    public String getPassword() {
+        return password;
+    }
+
+    @Override
+    protected String getTestQueryString() {
+        return "SELECT 1";
+    }
+
+    @SuppressWarnings("unchecked")
+    public MySqlContainer withConfigurationOverride(String s) {
+        parameters.put(MY_CNF_CONFIG_OVERRIDE_PARAM_NAME, s);
+        return this;
+    }
+
+    @SuppressWarnings("unchecked")
+    public MySqlContainer withSetupSQL(String sqlPath) {
+        parameters.put(SETUP_SQL_PARAM_NAME, sqlPath);
+        return this;
+    }
+
+    @Override
+    public MySqlContainer withDatabaseName(final String databaseName) {
+        this.databaseName = databaseName;
+        return this;
+    }
+
+    @Override
+    public MySqlContainer withUsername(final String username) {
+        this.username = username;
+        return this;
+    }
+
+    @Override
+    public MySqlContainer withPassword(final String password) {
+        this.password = password;
+        return this;
+    }
+}
diff --git 
a/streampark-flink/streampark-flink-catalog-store/src/test/java/org/apache/streampark/catalog/mysql/MysqlBaseITCASE.java
 
b/streampark-flink/streampark-flink-catalog-store/src/test/java/org/apache/streampark/catalog/mysql/MysqlBaseITCASE.java
new file mode 100644
index 000000000..799335b96
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-catalog-store/src/test/java/org/apache/streampark/catalog/mysql/MysqlBaseITCASE.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.catalog.mysql;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.util.stream.Stream;
+
+public class MysqlBaseITCASE {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(MysqlBaseITCASE.class);
+    protected static final MySqlContainer MYSQL_CONTAINER = 
createMySqlContainer("docker/server/my.cnf");
+
+    @org.junit.BeforeClass
+    public static void startContainers() {
+        LOG.info("Starting containers...");
+        Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
+        LOG.info("Containers are started.");
+    }
+
+    @org.junit.AfterClass
+    public static void stopContainers() {
+        LOG.info("Stopping containers...");
+        if (MYSQL_CONTAINER != null) {
+            MYSQL_CONTAINER.stop();
+        }
+        LOG.info("Containers are stopped.");
+    }
+
+    protected static MySqlContainer createMySqlContainer(String configPath) {
+        return (MySqlContainer) new 
MySqlContainer(MySqlContainer.MYSQL_VERSION)
+            .withConfigurationOverride(configPath)
+            .withSetupSQL("docker/setup.sql")
+            .withDatabaseName("flink-test")
+            .withUsername("flinkuser")
+            .withPassword("flinkpw")
+            .withLogConsumer(new Slf4jLogConsumer(LOG));
+    }
+}
diff --git 
a/streampark-flink/streampark-flink-catalog-store/src/test/resources/docker/server/my.cnf
 
b/streampark-flink/streampark-flink-catalog-store/src/test/resources/docker/server/my.cnf
new file mode 100644
index 000000000..953ffe8f9
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-catalog-store/src/test/resources/docker/server/my.cnf
@@ -0,0 +1,64 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#   http://www.apache.org/licenses/LICENSE-2.0
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# For advice on how to change settings please see
+# http://dev.mysql.com/doc/refman/5.7/en/server-configuration-defaults.html
+
+[mysqld]
+#
+# Remove leading # and set to the amount of RAM for the most important data
+# cache in MySQL. Start at 70% of total RAM for dedicated server, else 10%.
+# innodb_buffer_pool_size = 128M
+#
+# Remove leading # to turn on a very important data integrity option: logging
+# changes to the binary log between backups.
+# log_bin
+#
+# Remove leading # to set options mainly useful for reporting servers.
+# The server defaults are faster for transactions and fast SELECTs.
+# Adjust sizes as needed, experiment to find the optimal values.
+# join_buffer_size = 128M
+# sort_buffer_size = 2M
+# read_rnd_buffer_size = 2M
+skip-host-cache
+skip-name-resolve
+#datadir=/var/lib/mysql
+#socket=/var/lib/mysql/mysql.sock
+secure-file-priv=/var/lib/mysql
+user=mysql
+
+# Disabling symbolic-links is recommended to prevent assorted security risks
+symbolic-links=0
+
+#log-error=/var/log/mysqld.log
+#pid-file=/var/run/mysqld/mysqld.pid
+
+# ----------------------------------------------
+# Enable the binlog for replication & CDC
+# ----------------------------------------------
+
+# Enable binary replication log and set the prefix, expiration, and log format.
+# The prefix is arbitrary, expiration can be short for integration tests but 
would
+# be longer on a production system. Row-level info is required for ingest to 
work.
+# Server ID is required, but this will vary on production systems.
+server-id         = 223344
+log_bin           = mysql-bin
+binlog_format     = row
+# Make binlog_expire_logs_seconds = 1 and  max_binlog_size = 4096 to test the 
exception
+# message when the binlog expires in the server.
+binlog_expire_logs_seconds = 1
+max_binlog_size   = 4096
+
+# enable gtid mode
+gtid_mode = on
+enforce_gtid_consistency = on
\ No newline at end of file
diff --git 
a/streampark-flink/streampark-flink-catalog-store/src/test/resources/docker/setup.sql
 
b/streampark-flink/streampark-flink-catalog-store/src/test/resources/docker/setup.sql
new file mode 100644
index 000000000..1b22107de
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-catalog-store/src/test/resources/docker/setup.sql
@@ -0,0 +1,41 @@
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+-- 
+--      http://www.apache.org/licenses/LICENSE-2.0
+-- 
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+-- In production you would almost certainly limit the replication user must be 
on the follower (slave) machine,
+-- to prevent other clients accessing the log from other machines. For 
example, 'replicator'@'follower.acme.com'.
+-- However, in this database we'll grant 2 users different privileges:
+--
+-- 1) 'flinkuser' - all privileges required by the snapshot reader AND binlog 
reader (used for testing)
+-- 2) 'mysqluser' - all privileges
+--
+GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, 
LOCK TABLES  ON *.* TO 'flinkuser'@'%';
+CREATE USER 'mysqluser' IDENTIFIED BY 'mysqlpw';
+GRANT ALL PRIVILEGES ON *.* TO 'mysqluser'@'%';
+
+-- 
----------------------------------------------------------------------------------------------------------------
+-- DATABASE:  emptydb
+-- 
----------------------------------------------------------------------------------------------------------------
+CREATE DATABASE emptydb;
+USE flink-test;
+create table t_mysql_catalog (
+   `id` bigint NOT NULL AUTO_INCREMENT,
+   `catalog_name` varchar(255) NOT NULL,
+   `configuration` text,
+   `create_time` datetime DEFAULT NULL,
+   `update_time` datetime DEFAULT NULL,
+   PRIMARY KEY (`id`) USING BTREE,
+   UNIQUE INDEX `uniq_catalog_name` (`catalog_name`) USING BTREE
+)
+
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/pom.xml
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/pom.xml
index db8ff46cf..d13f26c1b 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/pom.xml
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.18/pom.xml
@@ -29,7 +29,7 @@
     <name>StreamPark : Flink Shims 1.18</name>
 
     <properties>
-        <flink.version>1.18.0</flink.version>
+        <flink.version>1.18.1</flink.version>
     </properties>
 
     <dependencies>

Reply via email to