This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 78c5fcc6ac Add mysql registry plugin (#10406)
78c5fcc6ac is described below

commit 78c5fcc6acd025c2ecdc736f8511c334c1e90c96
Author: Wenjun Ruan <[email protected]>
AuthorDate: Mon Jun 13 11:24:42 2022 +0800

    Add mysql registry plugin (#10406)
    
    * Add mysql registry plugin
---
 dolphinscheduler-api/pom.xml                       |   2 +-
 dolphinscheduler-master/pom.xml                    |   2 +-
 .../{ => dolphinscheduler-registry-all}/pom.xml    |  30 +-
 .../dolphinscheduler-registry-mysql/README.md      |  36 +++
 .../{ => dolphinscheduler-registry-mysql}/pom.xml  |  36 ++-
 .../plugin/registry/mysql/MysqlOperator.java       | 326 +++++++++++++++++++++
 .../plugin/registry/mysql/MysqlRegistry.java       | 177 +++++++++++
 .../registry/mysql/MysqlRegistryConstant.java      |  31 ++
 .../registry/mysql/MysqlRegistryProperties.java    |  47 +++
 .../plugin/registry/mysql/model/DataType.java      |  33 +++
 .../registry/mysql/model/MysqlRegistryData.java    |  40 +++
 .../registry/mysql/model/MysqlRegistryLock.java    |  54 ++++
 .../registry/mysql/task/EphemeralDateManager.java  | 162 ++++++++++
 .../registry/mysql/task/RegistryLockManager.java   | 137 +++++++++
 .../registry/mysql/task/SubscribeDataManager.java  | 156 ++++++++++
 .../src/main/resources/mysql_registry_init.sql     |  47 +++
 .../registry/zookeeper/ZookeeperRegistry.java      |   6 +-
 .../zookeeper/ZookeeperRegistryProperties.java}    |  46 +--
 .../registry/zookeeper/ZookeeperRegistryTest.java  |   3 +-
 .../dolphinscheduler-registry-plugins/pom.xml      |   3 +-
 dolphinscheduler-registry/pom.xml                  |   1 +
 dolphinscheduler-worker/pom.xml                    |   2 +-
 pom.xml                                            |  17 ++
 23 files changed, 1326 insertions(+), 68 deletions(-)

diff --git a/dolphinscheduler-api/pom.xml b/dolphinscheduler-api/pom.xml
index 6007ac1d9e..119dbe1b84 100644
--- a/dolphinscheduler-api/pom.xml
+++ b/dolphinscheduler-api/pom.xml
@@ -62,7 +62,7 @@
         </dependency>
         <dependency>
             <groupId>org.apache.dolphinscheduler</groupId>
-            <artifactId>dolphinscheduler-registry-zookeeper</artifactId>
+            <artifactId>dolphinscheduler-registry-all</artifactId>
         </dependency>
         <dependency>
             <groupId>org.apache.dolphinscheduler</groupId>
diff --git a/dolphinscheduler-master/pom.xml b/dolphinscheduler-master/pom.xml
index f21d281ccd..fadde209a9 100644
--- a/dolphinscheduler-master/pom.xml
+++ b/dolphinscheduler-master/pom.xml
@@ -49,7 +49,7 @@
         </dependency>
         <dependency>
             <groupId>org.apache.dolphinscheduler</groupId>
-            <artifactId>dolphinscheduler-registry-zookeeper</artifactId>
+            <artifactId>dolphinscheduler-registry-all</artifactId>
         </dependency>
         <dependency>
             <groupId>org.apache.dolphinscheduler</groupId>
diff --git a/dolphinscheduler-registry/pom.xml 
b/dolphinscheduler-registry/dolphinscheduler-registry-all/pom.xml
similarity index 64%
copy from dolphinscheduler-registry/pom.xml
copy to dolphinscheduler-registry/dolphinscheduler-registry-all/pom.xml
index 79a44079f8..de853f0eed 100644
--- a/dolphinscheduler-registry/pom.xml
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-all/pom.xml
@@ -19,34 +19,24 @@
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
     <parent>
-        <artifactId>dolphinscheduler</artifactId>
         <groupId>org.apache.dolphinscheduler</groupId>
+        <artifactId>dolphinscheduler-registry</artifactId>
         <version>dev-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
-    <artifactId>dolphinscheduler-registry</artifactId>
-    <packaging>pom</packaging>
+
+    <artifactId>dolphinscheduler-registry-all</artifactId>
 
     <dependencies>
         <dependency>
-            <groupId>org.springframework</groupId>
-            <artifactId>spring-context</artifactId>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.springframework.boot</groupId>
-            <artifactId>spring-boot-autoconfigure</artifactId>
-            <scope>provided</scope>
+            <groupId>org.apache.dolphinscheduler</groupId>
+            <artifactId>dolphinscheduler-registry-zookeeper</artifactId>
+            <version>${project.version}</version>
         </dependency>
         <dependency>
-            <groupId>javax.annotation</groupId>
-            <artifactId>javax.annotation-api</artifactId>
-            <scope>provided</scope>
+            <groupId>org.apache.dolphinscheduler</groupId>
+            <artifactId>dolphinscheduler-registry-mysql</artifactId>
+            <version>${project.version}</version>
         </dependency>
     </dependencies>
-
-    <modules>
-        <module>dolphinscheduler-registry-api</module>
-        <module>dolphinscheduler-registry-plugins</module>
-    </modules>
-</project>
+</project>
\ No newline at end of file
diff --git 
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/README.md
 
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/README.md
new file mode 100644
index 0000000000..a7a68a3d45
--- /dev/null
+++ 
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/README.md
@@ -0,0 +1,36 @@
+# Introduction
+
+This module is the mysql registry plugin module, this plugin will use mysql as 
the registry center.
+
+# How to use
+
+If you want to set the registry center as mysql, you need to do the below two 
steps:
+
+1. Initialize the mysql table
+
+You can directly execute the sql script 
`src/main/resources/mysql_registry_init.sql`.
+
+2. Open the config
+
+You need to set the registry properties in master/worker/api's appplication.yml
+
+```yaml
+registry:
+  type: mysql
+  term-refresh-interval: 2s
+  term-expire-times: 3
+  hikari-config:
+    driver-class-name: com.mysql.cj.jdbc.Driver
+    jdbc-url: jdbc:mysql://127.0.0.1:3306/dolphinscheduler
+    username: root
+    password: root
+    maximum-pool-size: 5
+    connection-timeout: 9000
+    idle-timeout: 600000
+```
+
+After do this two steps, you can start your DolphinScheduler cluster, your 
cluster will use mysql as registry center to
+store server metadata.
+
+NOTE: You need to add `mysql-connector-java.jar` into DS classpath, since this 
plugin will not bundle this driver in distribution. 
+You can get the detail about <a 
href="https://dolphinscheduler.apache.org/en-us/docs/latest/user_doc/guide/installation/pseudo-cluster.html";>Initialize
 the Database</a>
\ No newline at end of file
diff --git 
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/pom.xml 
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/pom.xml
similarity index 58%
copy from dolphinscheduler-registry/dolphinscheduler-registry-plugins/pom.xml
copy to 
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/pom.xml
index 1defc2d294..150b058354 100644
--- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/pom.xml
+++ 
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/pom.xml
@@ -17,20 +17,40 @@
   ~ specific language governing permissions and limitations
   ~ under the License.
   -->
-
 <project xmlns="http://maven.apache.org/POM/4.0.0";
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
     <parent>
-        <artifactId>dolphinscheduler-registry</artifactId>
         <groupId>org.apache.dolphinscheduler</groupId>
+        <artifactId>dolphinscheduler-registry-plugins</artifactId>
         <version>dev-SNAPSHOT</version>
     </parent>
-    <artifactId>dolphinscheduler-registry-plugins</artifactId>
     <modelVersion>4.0.0</modelVersion>
-    <packaging>pom</packaging>
 
-    <modules>
-        <module>dolphinscheduler-registry-zookeeper</module>
-    </modules>
-</project>
+    <artifactId>dolphinscheduler-registry-mysql</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.dolphinscheduler</groupId>
+            <artifactId>dolphinscheduler-registry-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.dolphinscheduler</groupId>
+            <artifactId>dolphinscheduler-common</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.zaxxer</groupId>
+            <artifactId>HikariCP</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>mysql</groupId>
+            <artifactId>mysql-connector-java</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+    </dependencies>
+
+</project>
\ No newline at end of file
diff --git 
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlOperator.java
 
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlOperator.java
new file mode 100644
index 0000000000..d5dd507f92
--- /dev/null
+++ 
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlOperator.java
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.registry.mysql;
+
+import org.apache.dolphinscheduler.plugin.registry.mysql.model.DataType;
+import 
org.apache.dolphinscheduler.plugin.registry.mysql.model.MysqlRegistryData;
+import 
org.apache.dolphinscheduler.plugin.registry.mysql.model.MysqlRegistryLock;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.sql.Array;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.SQLIntegrityConstraintViolationException;
+import java.sql.Statement;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.zaxxer.hikari.HikariConfig;
+import com.zaxxer.hikari.HikariDataSource;
+
+/**
+ * Used to CRUD from mysql
+ */
+public class MysqlOperator implements AutoCloseable {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(MysqlOperator.class);
+
+    private final HikariDataSource dataSource;
+    private final long expireTimeWindow;
+
+    public MysqlOperator(MysqlRegistryProperties registryProperties) {
+        this.expireTimeWindow = registryProperties.getTermExpireTimes() * 
registryProperties.getTermRefreshInterval().toMillis();
+
+        HikariConfig hikariConfig = registryProperties.getHikariConfig();
+        hikariConfig.setPoolName("MysqlRegistryDataSourcePool");
+
+        this.dataSource = new HikariDataSource(hikariConfig);
+    }
+
+    public void healthCheck() throws SQLException {
+        String sql = "select 1 from t_ds_mysql_registry_data";
+        try (Connection connection = dataSource.getConnection();
+             PreparedStatement preparedStatement = 
connection.prepareStatement(sql);
+             ResultSet resultSet = preparedStatement.executeQuery();) {
+            // if no exception, the healthCheck success
+        }
+    }
+
+    public List<MysqlRegistryData> queryAllMysqlRegistryData() throws 
SQLException {
+        String sql = "select id, `key`, data, type, create_time, 
last_update_time from t_ds_mysql_registry_data";
+        try (Connection connection = dataSource.getConnection();
+             PreparedStatement preparedStatement = 
connection.prepareStatement(sql);
+             ResultSet resultSet = preparedStatement.executeQuery()) {
+            List<MysqlRegistryData> result = new 
ArrayList<>(resultSet.getFetchSize());
+            while (resultSet.next()) {
+                MysqlRegistryData mysqlRegistryData = 
MysqlRegistryData.builder()
+                        .id(resultSet.getLong("id"))
+                        .key(resultSet.getString("key"))
+                        .data(resultSet.getString("data"))
+                        .type(resultSet.getInt("type"))
+                        .createTime(resultSet.getTimestamp("create_time"))
+                        
.lastUpdateTime(resultSet.getTimestamp("last_update_time"))
+                        .build();
+                result.add(mysqlRegistryData);
+            }
+            return result;
+        }
+    }
+
+    public long insertOrUpdateEphemeralData(String key, String value) throws 
SQLException {
+        String sql = "INSERT INTO t_ds_mysql_registry_data (`key`, data, type, 
create_time, last_update_time) VALUES (?, ?, ?, current_timestamp, 
current_timestamp)" +
+                "ON DUPLICATE KEY UPDATE data=?, 
last_update_time=current_timestamp";
+        // put a ephemeralData
+        try (Connection connection = dataSource.getConnection();
+             PreparedStatement preparedStatement = 
connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) {
+            preparedStatement.setString(1, key);
+            preparedStatement.setString(2, value);
+            preparedStatement.setInt(3, DataType.EPHEMERAL.getTypeValue());
+            preparedStatement.setString(4, value);
+            int insertCount = preparedStatement.executeUpdate();
+            ResultSet generatedKeys = preparedStatement.getGeneratedKeys();
+            if (insertCount < 1 || !generatedKeys.next()) {
+                throw new SQLException("Insert or update ephemeral data 
error");
+            }
+            return generatedKeys.getLong(1);
+        }
+    }
+
+    public long insertOrUpdatePersistentData(String key, String value) throws 
SQLException {
+        String sql = "INSERT INTO t_ds_mysql_registry_data (`key`, data, type, 
create_time, last_update_time) VALUES (?, ?, ?, current_timestamp, 
current_timestamp)" +
+                "ON DUPLICATE KEY UPDATE data=?, 
last_update_time=current_timestamp";
+        // put a persistent Data
+        try (Connection connection = dataSource.getConnection();
+             PreparedStatement preparedStatement = 
connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) {
+            preparedStatement.setString(1, key);
+            preparedStatement.setString(2, value);
+            preparedStatement.setInt(3, DataType.PERSISTENT.getTypeValue());
+            preparedStatement.setString(4, value);
+            int insertCount = preparedStatement.executeUpdate();
+            ResultSet generatedKeys = preparedStatement.getGeneratedKeys();
+            if (insertCount < 1 || !generatedKeys.next()) {
+                throw new SQLException("Insert or update persistent data 
error");
+            }
+            return generatedKeys.getLong(1);
+        }
+    }
+
+    public void deleteEphemeralData(String key) throws SQLException {
+        String sql = "DELETE from t_ds_mysql_registry_data where `key` = ? and 
type = ?";
+        try (Connection connection = dataSource.getConnection();
+             PreparedStatement preparedStatement = 
connection.prepareStatement(sql)) {
+            preparedStatement.setString(1, key);
+            preparedStatement.setInt(2, DataType.EPHEMERAL.getTypeValue());
+            preparedStatement.execute();
+        }
+    }
+
+    public void deleteEphemeralData(long ephemeralNodeId) throws SQLException {
+        String sql = "DELETE from t_ds_mysql_registry_data where `id` = ?";
+        try (Connection connection = dataSource.getConnection();
+             PreparedStatement preparedStatement = 
connection.prepareStatement(sql)) {
+            preparedStatement.setLong(1, ephemeralNodeId);
+            preparedStatement.execute();
+        }
+    }
+
+    public void deletePersistentData(String key) throws SQLException {
+        String sql = "DELETE from t_ds_mysql_registry_data where `key` = ? and 
type = ?";
+        try (Connection connection = dataSource.getConnection();
+             PreparedStatement preparedStatement = 
connection.prepareStatement(sql)) {
+            preparedStatement.setString(1, key);
+            preparedStatement.setInt(2, DataType.PERSISTENT.getTypeValue());
+            preparedStatement.execute();
+        }
+    }
+
+    public void clearExpireLock() {
+        String sql = "delete from t_ds_mysql_registry_lock where last_term < 
?";
+        try (Connection connection = dataSource.getConnection();
+             PreparedStatement preparedStatement = 
connection.prepareStatement(sql)) {
+            preparedStatement.setTimestamp(1,
+                    new Timestamp(System.currentTimeMillis() - 
expireTimeWindow));
+            int i = preparedStatement.executeUpdate();
+            if (i > 0) {
+                logger.info("Clear expire lock, size: {}", i);
+            }
+        } catch (Exception ex) {
+            logger.warn("Clear expire lock from mysql registry error", ex);
+        }
+    }
+
+    public void clearExpireEphemeralDate() {
+        String sql = "delete from t_ds_mysql_registry_data where 
last_update_time < ? and type = ?";
+        try (Connection connection = dataSource.getConnection();
+             PreparedStatement preparedStatement = 
connection.prepareStatement(sql)) {
+            preparedStatement.setTimestamp(1, new 
Timestamp(System.currentTimeMillis() - expireTimeWindow));
+            preparedStatement.setInt(2, DataType.EPHEMERAL.getTypeValue());
+            int i = preparedStatement.executeUpdate();
+            if (i > 0) {
+                logger.info("clear expire ephemeral data, size:{}", i);
+            }
+        } catch (Exception ex) {
+            logger.warn("Clear expire ephemeral data from mysql registry 
error", ex);
+        }
+    }
+
+    public MysqlRegistryData getData(String key) throws SQLException {
+        String sql = "SELECT id, `key`, data, type, create_time, 
last_update_time FROM t_ds_mysql_registry_data WHERE `key` = ?";
+        try (Connection connection = dataSource.getConnection();
+             PreparedStatement preparedStatement = 
connection.prepareStatement(sql)) {
+            preparedStatement.setString(1, key);
+            try (ResultSet resultSet = preparedStatement.executeQuery()) {
+                if (!resultSet.next()) {
+                    return null;
+                }
+                return MysqlRegistryData.builder()
+                        .id(resultSet.getLong("id"))
+                        .key(resultSet.getString("key"))
+                        .data(resultSet.getString("data"))
+                        .type(resultSet.getInt("type"))
+                        .createTime(resultSet.getTimestamp("create_time"))
+                        
.lastUpdateTime(resultSet.getTimestamp("last_update_time"))
+                        .build();
+            }
+        }
+    }
+
+    public List<String> getChildren(String key) throws SQLException {
+        String sql = "SELECT `key` from t_ds_mysql_registry_data where `key` 
like ?";
+        try (Connection connection = dataSource.getConnection();
+             PreparedStatement preparedStatement = 
connection.prepareStatement(sql)) {
+            preparedStatement.setString(1, key + "%");
+            try (ResultSet resultSet = preparedStatement.executeQuery()) {
+                List<String> result = new 
ArrayList<>(resultSet.getFetchSize());
+                while (resultSet.next()) {
+                    String fullPath = resultSet.getString("key");
+                    if (fullPath.length() > key.length()) {
+                        
result.add(StringUtils.substringBefore(fullPath.substring(key.length() + 1), 
"/"));
+                    }
+                }
+                return result;
+            }
+        }
+    }
+
+    public boolean existKey(String key) throws SQLException {
+        String sql = "SELECT 1 FROM t_ds_mysql_registry_data WHERE `key` = ?";
+        try (Connection connection = dataSource.getConnection();
+             PreparedStatement preparedStatement = 
connection.prepareStatement(sql)) {
+            preparedStatement.setString(1, key);
+            try (ResultSet resultSet = preparedStatement.executeQuery()) {
+                if (resultSet.next()) {
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
+    /**
+     * Try to acquire the target Lock, if cannot acquire, return null.
+     */
+    public MysqlRegistryLock tryToAcquireLock(String key) throws SQLException {
+        String sql = "INSERT INTO t_ds_mysql_registry_lock (`key`, lock_owner, 
last_term, last_update_time, create_time) VALUES (?, ?, current_timestamp, 
current_timestamp, current_timestamp)";
+        try (Connection connection = dataSource.getConnection();
+             PreparedStatement preparedStatement = 
connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) {
+            preparedStatement.setString(1, key);
+            preparedStatement.setString(2, MysqlRegistryConstant.LOCK_OWNER);
+            preparedStatement.executeUpdate();
+            try (ResultSet resultSet = preparedStatement.getGeneratedKeys()) {
+                if (resultSet.next()) {
+                    long newLockId = resultSet.getLong(1);
+                    return getLockById(newLockId);
+                }
+            }
+            return null;
+        } catch (SQLIntegrityConstraintViolationException e) {
+            // duplicate exception
+            return null;
+        }
+    }
+
+    public MysqlRegistryLock getLockById(long lockId) throws SQLException {
+        String sql = "SELECT `id`, `key`, lock_owner, last_term, 
last_update_time, create_time FROM t_ds_mysql_registry_lock WHERE id = ?";
+        try (Connection connection = dataSource.getConnection();
+             PreparedStatement preparedStatement = 
connection.prepareStatement(sql)) {
+            preparedStatement.setLong(1, lockId);
+            try (ResultSet resultSet = preparedStatement.executeQuery()) {
+                if (resultSet.next()) {
+                    return MysqlRegistryLock.builder()
+                            .id(resultSet.getLong("id"))
+                            .key(resultSet.getString("key"))
+                            .lockOwner(resultSet.getString("lock_owner"))
+                            .lastTerm(resultSet.getTimestamp("last_term"))
+                            
.lastUpdateTime(resultSet.getTimestamp("last_update_time"))
+                            .createTime(resultSet.getTimestamp("create_time"))
+                            .build();
+                }
+            }
+            return null;
+        }
+    }
+
+    // release the lock
+    public boolean releaseLock(long lockId) throws SQLException {
+        String sql = "DELETE FROM t_ds_mysql_registry_lock WHERE id = ?";
+        try (Connection connection = dataSource.getConnection();
+             PreparedStatement preparedStatement = 
connection.prepareStatement(sql)) {
+            preparedStatement.setLong(1, lockId);
+            int i = preparedStatement.executeUpdate();
+            return i > 0;
+        }
+    }
+
+    public boolean updateEphemeralDataTerm(Collection<Long> ephemeralDateIds) 
throws SQLException {
+        String sql = "update t_ds_mysql_registry_data set `last_update_time` = 
current_timestamp() where `id` IN (?)";
+        try (Connection connection = dataSource.getConnection();
+             PreparedStatement preparedStatement = 
connection.prepareStatement(sql)) {
+            Array idArray = connection.createArrayOf("bigint", 
ephemeralDateIds.toArray());
+            preparedStatement.setArray(1, idArray);
+            return preparedStatement.executeUpdate() > 0;
+        }
+    }
+
+    public boolean updateLockTerm(List<Long> lockIds) throws SQLException {
+        String sql = "update t_ds_mysql_registry_lock set `last_term` = 
current_timestamp and `last_update_time` = current_timestamp where `id` IN (?)";
+        try (Connection connection = dataSource.getConnection();
+             PreparedStatement preparedStatement = 
connection.prepareStatement(sql)) {
+            Array idArray = connection.createArrayOf("bigint", 
lockIds.toArray());
+            preparedStatement.setArray(1, idArray);
+            return preparedStatement.executeUpdate() > 0;
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (!dataSource.isClosed()) {
+            try (HikariDataSource closedDatasource = this.dataSource) {
+
+            }
+        }
+    }
+}
diff --git 
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlRegistry.java
 
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlRegistry.java
new file mode 100644
index 0000000000..fa81471898
--- /dev/null
+++ 
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlRegistry.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.registry.mysql;
+
+import 
org.apache.dolphinscheduler.plugin.registry.mysql.task.EphemeralDateManager;
+import 
org.apache.dolphinscheduler.plugin.registry.mysql.task.RegistryLockManager;
+import 
org.apache.dolphinscheduler.plugin.registry.mysql.task.SubscribeDataManager;
+import org.apache.dolphinscheduler.registry.api.ConnectionListener;
+import org.apache.dolphinscheduler.registry.api.Registry;
+import org.apache.dolphinscheduler.registry.api.RegistryException;
+import org.apache.dolphinscheduler.registry.api.SubscribeListener;
+
+import java.sql.SQLException;
+import java.time.Duration;
+import java.util.Collection;
+
+import javax.annotation.PostConstruct;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.stereotype.Component;
+
+
+/**
+ * This is one of the implementation of {@link Registry}, with this 
implementation, you need to rely on mysql database to
+ * store the DolphinScheduler master/worker's metadata and do the server 
registry/unRegistry.
+ */
+@Component
+@ConditionalOnProperty(prefix = "registry", name = "type", havingValue = 
"mysql")
+public class MysqlRegistry implements Registry {
+
+    private static Logger LOGGER = 
LoggerFactory.getLogger(MysqlRegistry.class);
+
+    private final EphemeralDateManager ephemeralDateManager;
+    private final SubscribeDataManager subscribeDataManager;
+    private final RegistryLockManager registryLockManager;
+    private final MysqlOperator mysqlOperator;
+
+    public MysqlRegistry(MysqlRegistryProperties mysqlRegistryProperties) {
+        this.mysqlOperator = new MysqlOperator(mysqlRegistryProperties);
+        mysqlOperator.clearExpireLock();
+        mysqlOperator.clearExpireEphemeralDate();
+        this.ephemeralDateManager = new 
EphemeralDateManager(mysqlRegistryProperties, mysqlOperator);
+        this.subscribeDataManager = new 
SubscribeDataManager(mysqlRegistryProperties, mysqlOperator);
+        this.registryLockManager = new 
RegistryLockManager(mysqlRegistryProperties, mysqlOperator);
+        LOGGER.info("Initialize Mysql Registry...");
+    }
+
+    @PostConstruct
+    public void start() {
+        LOGGER.info("Starting Mysql Registry...");
+        // start a mysql connect check
+        ephemeralDateManager.start();
+        subscribeDataManager.start();
+        registryLockManager.start();
+        LOGGER.info("Started Mysql Registry...");
+    }
+
+    @Override
+    public boolean subscribe(String path, SubscribeListener listener) {
+        // new a schedule thread to query the path, if the path
+        subscribeDataManager.addListener(path, listener);
+        return true;
+    }
+
+    @Override
+    public void unsubscribe(String path) {
+        subscribeDataManager.removeListener(path);
+    }
+
+    @Override
+    public void addConnectionStateListener(ConnectionListener listener) {
+        // check the current connection
+        ephemeralDateManager.addConnectionListener(listener);
+    }
+
+    @Override
+    public String get(String key) {
+        // get the key value
+        return subscribeDataManager.getData(key);
+    }
+
+    @Override
+    public void put(String key, String value, boolean deleteOnDisconnect) {
+        try {
+            if (deleteOnDisconnect) {
+                // when put a ephemeralData will new a scheduler thread to 
update it
+                ephemeralDateManager.insertOrUpdateEphemeralData(key, value);
+            } else {
+                mysqlOperator.insertOrUpdatePersistentData(key, value);
+            }
+        } catch (Exception ex) {
+            throw new RegistryException(String.format("put key:%s, value:%s 
error", key, value), ex);
+        }
+    }
+
+    @Override
+    public void delete(String key) {
+        try {
+            mysqlOperator.deleteEphemeralData(key);
+            mysqlOperator.deletePersistentData(key);
+        } catch (Exception e) {
+            throw new RegistryException(String.format("Delete key: %s error", 
key), e);
+        }
+    }
+
+    @Override
+    public Collection<String> children(String key) {
+        try {
+            return mysqlOperator.getChildren(key);
+        } catch (SQLException e) {
+            throw new RegistryException(String.format("Get key: %s children 
error", key), e);
+        }
+    }
+
+    @Override
+    public boolean exists(String key) {
+        try {
+            return mysqlOperator.existKey(key);
+        } catch (Exception e) {
+            throw new RegistryException(String.format("Check key: %s exist 
error", key), e);
+        }
+    }
+
+    @Override
+    public boolean acquireLock(String key) {
+        try {
+            registryLockManager.acquireLock(key);
+            return true;
+        } catch (RegistryException e) {
+            throw e;
+        } catch (Exception e) {
+            throw new RegistryException(String.format("Acquire lock: %s 
error", key), e);
+        }
+    }
+
+    @Override
+    public boolean releaseLock(String key) {
+        registryLockManager.releaseLock(key);
+        return true;
+    }
+
+    @Override
+    public Duration getSessionTimeout() {
+        throw new UnsupportedOperationException("Not support session timeout 
at Mysql Registry");
+    }
+
+    @Override
+    public void close() {
+        LOGGER.info("Closing Mysql Registry...");
+        // remove the current Ephemeral node, if can connect to mysql
+        try (EphemeralDateManager closed1 = ephemeralDateManager;
+             SubscribeDataManager close2 = subscribeDataManager;
+             RegistryLockManager close3 = registryLockManager;
+             MysqlOperator closed4 = mysqlOperator) {
+        } catch (Exception e) {
+            LOGGER.error("Close Mysql Registry error", e);
+        }
+        LOGGER.info("Closed Mysql Registry...");
+    }
+}
diff --git 
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlRegistryConstant.java
 
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlRegistryConstant.java
new file mode 100644
index 0000000000..1482560b7c
--- /dev/null
+++ 
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlRegistryConstant.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.registry.mysql;
+
+import org.apache.dolphinscheduler.common.utils.NetUtils;
+import org.apache.dolphinscheduler.common.utils.OSUtils;
+
+import lombok.experimental.UtilityClass;
+
+@UtilityClass
+public final class MysqlRegistryConstant {
+
+    public static final long LOCK_ACQUIRE_INTERVAL = 1_000;
+
+    public static final String LOCK_OWNER = NetUtils.getHost() + "_" + 
OSUtils.getProcessID();
+}
diff --git 
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlRegistryProperties.java
 
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlRegistryProperties.java
new file mode 100644
index 0000000000..d80b8ead0d
--- /dev/null
+++ 
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlRegistryProperties.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.registry.mysql;
+
+import java.time.Duration;
+
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.Configuration;
+
+import com.zaxxer.hikari.HikariConfig;
+
+import lombok.Data;
+
+@Data
+@Configuration
+@ConditionalOnProperty(prefix = "registry", name = "type", havingValue = 
"mysql")
+@ConfigurationProperties(prefix = "registry")
+public class MysqlRegistryProperties {
+
+    /**
+     * Used to schedule refresh the ephemeral data/ lock.
+     */
+    private Duration termRefreshInterval = Duration.ofSeconds(2);
+    /**
+     * Used to calculate the expire time,
+     * e.g. if you set 2, and latest two refresh error, then the ephemeral 
data/lock will be expire.
+     */
+    private int termExpireTimes = 3;
+    private HikariConfig hikariConfig;
+
+}
diff --git 
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/DataType.java
 
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/DataType.java
new file mode 100644
index 0000000000..657c80b716
--- /dev/null
+++ 
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/DataType.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.registry.mysql.model;
+
+public enum DataType {
+    EPHEMERAL(1),
+    PERSISTENT(2),
+    ;
+    private final int typeValue;
+
+    DataType(int typeValue) {
+        this.typeValue = typeValue;
+    }
+
+    public int getTypeValue() {
+        return typeValue;
+    }
+}
diff --git 
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/MysqlRegistryData.java
 
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/MysqlRegistryData.java
new file mode 100644
index 0000000000..e3045881c5
--- /dev/null
+++ 
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/MysqlRegistryData.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.registry.mysql.model;
+
+import java.util.Date;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class MysqlRegistryData {
+
+    private long id;
+    private String key;
+    private String data;
+    private int type;
+    private Date createTime;
+    private Date lastUpdateTime;
+
+}
diff --git 
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/MysqlRegistryLock.java
 
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/MysqlRegistryLock.java
new file mode 100644
index 0000000000..79d718cd4f
--- /dev/null
+++ 
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/MysqlRegistryLock.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.registry.mysql.model;
+
+import java.util.Date;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class MysqlRegistryLock {
+
+    private long id;
+    /**
+     * The lock key.
+     */
+    private String key;
+    /**
+     * acquire lock host.
+     */
+    private String lockOwner;
+    /**
+     * The last term, if the (currentTime - lastTerm) > termExpire time, the 
lock will be expired.
+     */
+    private Date lastTerm;
+    /**
+     * The lock last update time.
+     */
+    private Date lastUpdateTime;
+    /**
+     * The lock create time.
+     */
+    private Date createTime;
+}
diff --git 
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/task/EphemeralDateManager.java
 
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/task/EphemeralDateManager.java
new file mode 100644
index 0000000000..7e8bc53882
--- /dev/null
+++ 
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/task/EphemeralDateManager.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.registry.mysql.task;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.dolphinscheduler.plugin.registry.mysql.MysqlOperator;
+import 
org.apache.dolphinscheduler.plugin.registry.mysql.MysqlRegistryProperties;
+import org.apache.dolphinscheduler.registry.api.ConnectionListener;
+import org.apache.dolphinscheduler.registry.api.ConnectionState;
+
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * This thread is used to check the connect state to mysql.
+ */
+public class EphemeralDateManager implements AutoCloseable {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(EphemeralDateManager.class);
+
+    private final MysqlOperator mysqlOperator;
+    private final MysqlRegistryProperties registryProperties;
+    private final List<ConnectionListener> connectionListeners = 
Collections.synchronizedList(new ArrayList<>());
+    private final Set<Long> ephemeralDateIds = Collections.synchronizedSet(new 
HashSet<>());
+    private final ScheduledExecutorService scheduledExecutorService;
+
+    public EphemeralDateManager(MysqlRegistryProperties registryProperties, 
MysqlOperator mysqlOperator) {
+        this.registryProperties = registryProperties;
+        this.mysqlOperator = checkNotNull(mysqlOperator);
+        this.scheduledExecutorService = Executors.newScheduledThreadPool(
+                1,
+                new 
ThreadFactoryBuilder().setNameFormat("EphemeralDateTermRefreshThread").setDaemon(true).build());
+    }
+
+    public void start() {
+        this.scheduledExecutorService.scheduleWithFixedDelay(
+                new EphemeralDateTermRefreshTask(mysqlOperator, 
connectionListeners, ephemeralDateIds),
+                registryProperties.getTermRefreshInterval().toMillis(),
+                registryProperties.getTermRefreshInterval().toMillis(),
+                TimeUnit.MILLISECONDS);
+    }
+
+    public void addConnectionListener(ConnectionListener connectionListener) {
+        connectionListeners.add(connectionListener);
+    }
+
+    public long insertOrUpdateEphemeralData(String key, String value) throws 
SQLException {
+        long ephemeralId = mysqlOperator.insertOrUpdateEphemeralData(key, 
value);
+        ephemeralDateIds.add(ephemeralId);
+        return ephemeralId;
+    }
+
+    @Override
+    public void close() throws SQLException {
+        ephemeralDateIds.clear();
+        connectionListeners.clear();
+        scheduledExecutorService.shutdownNow();
+        for (Long ephemeralDateId : ephemeralDateIds) {
+            mysqlOperator.deleteEphemeralData(ephemeralDateId);
+        }
+    }
+
+    // Use this task to refresh ephemeral term and check the connect state.
+    static class EphemeralDateTermRefreshTask implements Runnable {
+        private final List<ConnectionListener> connectionListeners;
+        private final Set<Long> ephemeralDateIds;
+        private final MysqlOperator mysqlOperator;
+        private ConnectionState connectionState;
+
+        private EphemeralDateTermRefreshTask(MysqlOperator mysqlOperator,
+                                             List<ConnectionListener> 
connectionListeners,
+                                             Set<Long> ephemeralDateIds) {
+            this.mysqlOperator = checkNotNull(mysqlOperator);
+            this.connectionListeners = checkNotNull(connectionListeners);
+            this.ephemeralDateIds = checkNotNull(ephemeralDateIds);
+        }
+
+        @Override
+        public void run() {
+            try {
+                ConnectionState currentConnectionState = getConnectionState();
+                if (currentConnectionState == connectionState) {
+                    // no state change
+                    return;
+                }
+
+                if (connectionState == ConnectionState.CONNECTED) {
+                    if (currentConnectionState == 
ConnectionState.DISCONNECTED) {
+                        connectionState = ConnectionState.DISCONNECTED;
+                        triggerListener(ConnectionState.DISCONNECTED);
+                    }
+                } else if (connectionState == ConnectionState.DISCONNECTED) {
+                    if (currentConnectionState == ConnectionState.CONNECTED) {
+                        connectionState = ConnectionState.CONNECTED;
+                        triggerListener(ConnectionState.RECONNECTED);
+                    }
+                } else if (connectionState == null) {
+                    connectionState = currentConnectionState;
+                    triggerListener(connectionState);
+                }
+            } catch (Exception e) {
+                LOGGER.error("Mysql Registry connect state check task execute 
failed", e);
+                connectionState = ConnectionState.DISCONNECTED;
+                triggerListener(ConnectionState.DISCONNECTED);
+            }
+        }
+
+        private ConnectionState getConnectionState() {
+            try {
+                if (ephemeralDateIds.isEmpty()) {
+                    mysqlOperator.healthCheck();
+                } else {
+                    updateEphemeralDateTerm();
+                }
+                mysqlOperator.clearExpireEphemeralDate();
+                return ConnectionState.CONNECTED;
+            } catch (Exception ex) {
+                return ConnectionState.DISCONNECTED;
+            }
+        }
+
+        private void updateEphemeralDateTerm() throws SQLException {
+            if (!mysqlOperator.updateEphemeralDataTerm(ephemeralDateIds)) {
+                LOGGER.warn("Update mysql registry ephemeral data: {} term 
error", ephemeralDateIds);
+            }
+        }
+
+        private void triggerListener(ConnectionState connectionState) {
+            for (ConnectionListener connectionListener : connectionListeners) {
+                connectionListener.onUpdate(connectionState);
+            }
+        }
+    }
+}
diff --git 
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/task/RegistryLockManager.java
 
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/task/RegistryLockManager.java
new file mode 100644
index 0000000000..478727545b
--- /dev/null
+++ 
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/task/RegistryLockManager.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.registry.mysql.task;
+
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
+import org.apache.dolphinscheduler.plugin.registry.mysql.MysqlOperator;
+import org.apache.dolphinscheduler.plugin.registry.mysql.MysqlRegistryConstant;
+import 
org.apache.dolphinscheduler.plugin.registry.mysql.MysqlRegistryProperties;
+import 
org.apache.dolphinscheduler.plugin.registry.mysql.model.MysqlRegistryLock;
+import org.apache.dolphinscheduler.registry.api.RegistryException;
+
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import lombok.AccessLevel;
+import lombok.RequiredArgsConstructor;
+
+public class RegistryLockManager implements AutoCloseable {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(RegistryLockManager.class);
+
+    private final MysqlOperator mysqlOperator;
+    private final MysqlRegistryProperties registryProperties;
+    private final Map<String, MysqlRegistryLock> lockHoldMap;
+    private final ScheduledExecutorService lockTermUpdateThreadPool;
+
+    public RegistryLockManager(MysqlRegistryProperties registryProperties, 
MysqlOperator mysqlOperator) {
+        this.registryProperties = registryProperties;
+        this.mysqlOperator = mysqlOperator;
+        this.lockHoldMap = new ConcurrentHashMap<>();
+        this.lockTermUpdateThreadPool = 
Executors.newSingleThreadScheduledExecutor(
+                new 
ThreadFactoryBuilder().setNameFormat("MysqlRegistryLockTermRefreshThread").setDaemon(true).build());
+    }
+
+    public void start() {
+        lockTermUpdateThreadPool.scheduleWithFixedDelay(
+                new LockTermRefreshTask(lockHoldMap, mysqlOperator),
+                registryProperties.getTermRefreshInterval().toMillis(),
+                registryProperties.getTermRefreshInterval().toMillis(),
+                TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * Acquire the lock, if cannot get the lock will await.
+     */
+    public void acquireLock(String lockKey) throws RegistryException {
+        // maybe we can use the computeIf absent
+        lockHoldMap.computeIfAbsent(lockKey, key -> {
+            MysqlRegistryLock mysqlRegistryLock;
+            try {
+                while ((mysqlRegistryLock = 
mysqlOperator.tryToAcquireLock(lockKey)) == null) {
+                    logger.debug("Acquire the lock {} failed try again", key);
+                    // acquire failed, wait and try again
+                    
ThreadUtils.sleep(MysqlRegistryConstant.LOCK_ACQUIRE_INTERVAL);
+                }
+            } catch (SQLException e) {
+                throw new RegistryException("Acquire the lock error", e);
+            }
+            return mysqlRegistryLock;
+        });
+    }
+
+    public void releaseLock(String lockKey) {
+        MysqlRegistryLock mysqlRegistryLock = lockHoldMap.get(lockKey);
+        if (mysqlRegistryLock != null) {
+            try {
+                // the lock is unExit
+                mysqlOperator.releaseLock(mysqlRegistryLock.getId());
+                lockHoldMap.remove(lockKey);
+            } catch (SQLException e) {
+                throw new RegistryException(String.format("Release lock: %s 
error", lockKey), e);
+            }
+        }
+    }
+
+    @Override
+    public void close() {
+        lockTermUpdateThreadPool.shutdownNow();
+        for (Map.Entry<String, MysqlRegistryLock> lockEntry : 
lockHoldMap.entrySet()) {
+            releaseLock(lockEntry.getKey());
+        }
+    }
+
+    /**
+     * This task is used to refresh the lock held by the current server.
+     */
+    @RequiredArgsConstructor(access = AccessLevel.PRIVATE)
+    static class LockTermRefreshTask implements Runnable {
+        private final Map<String, MysqlRegistryLock> lockHoldMap;
+        private final MysqlOperator mysqlOperator;
+
+        public void run() {
+            try {
+                if (lockHoldMap.isEmpty()) {
+                    return;
+                }
+                List<Long> lockIds = lockHoldMap.values()
+                        .stream()
+                        .map(MysqlRegistryLock::getId)
+                        .collect(Collectors.toList());
+                if (!mysqlOperator.updateLockTerm(lockIds)) {
+                    logger.warn("Update the lock: {} term failed.", lockIds);
+                }
+                mysqlOperator.clearExpireLock();
+            } catch (Exception e) {
+                logger.error("Update lock term error", e);
+            }
+        }
+    }
+}
+
diff --git 
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/task/SubscribeDataManager.java
 
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/task/SubscribeDataManager.java
new file mode 100644
index 0000000000..4c8d7190f3
--- /dev/null
+++ 
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/task/SubscribeDataManager.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.registry.mysql.task;
+
+import org.apache.dolphinscheduler.plugin.registry.mysql.MysqlOperator;
+import 
org.apache.dolphinscheduler.plugin.registry.mysql.MysqlRegistryProperties;
+import 
org.apache.dolphinscheduler.plugin.registry.mysql.model.MysqlRegistryData;
+import org.apache.dolphinscheduler.registry.api.Event;
+import org.apache.dolphinscheduler.registry.api.SubscribeListener;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import lombok.RequiredArgsConstructor;
+
+/**
+ * Used to refresh if the subscribe path has been changed.
+ */
+public class SubscribeDataManager implements AutoCloseable {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SubscribeDataManager.class);
+
+    private final MysqlOperator mysqlOperator;
+    private final MysqlRegistryProperties registryProperties;
+    private final Map<String, List<SubscribeListener>> dataSubScribeMap = new 
ConcurrentHashMap<>();
+    private final ScheduledExecutorService dataSubscribeCheckThreadPool;
+    private final Map<String, MysqlRegistryData> mysqlRegistryDataMap = new 
ConcurrentHashMap<>();
+
+    public SubscribeDataManager(MysqlRegistryProperties registryProperties, 
MysqlOperator mysqlOperator) {
+        this.registryProperties = registryProperties;
+        this.mysqlOperator = mysqlOperator;
+        this.dataSubscribeCheckThreadPool = Executors.newScheduledThreadPool(
+                1,
+                new 
ThreadFactoryBuilder().setNameFormat("MysqlRegistrySubscribeDataCheckThread").setDaemon(true).build());
+    }
+
+    public void start() {
+        dataSubscribeCheckThreadPool.scheduleWithFixedDelay(
+                new RegistrySubscribeDataCheckTask(dataSubScribeMap, 
mysqlOperator, mysqlRegistryDataMap),
+                registryProperties.getTermRefreshInterval().toMillis(),
+                registryProperties.getTermRefreshInterval().toMillis(),
+                TimeUnit.MILLISECONDS);
+    }
+
+    public void addListener(String path, SubscribeListener subscribeListener) {
+        dataSubScribeMap.computeIfAbsent(path, k -> new 
ArrayList<>()).add(subscribeListener);
+    }
+
+    public void removeListener(String path) {
+        dataSubScribeMap.remove(path);
+    }
+
+    public String getData(String path) {
+        MysqlRegistryData mysqlRegistryData = mysqlRegistryDataMap.get(path);
+        if (mysqlRegistryData == null) {
+            return null;
+        }
+        return mysqlRegistryData.getData();
+    }
+
+    @Override
+    public void close() {
+        dataSubscribeCheckThreadPool.shutdownNow();
+        dataSubScribeMap.clear();
+    }
+
+    @RequiredArgsConstructor
+    static class RegistrySubscribeDataCheckTask implements Runnable {
+
+        private final Map<String, List<SubscribeListener>> dataSubScribeMap;
+        private final MysqlOperator mysqlOperator;
+        private final Map<String, MysqlRegistryData> mysqlRegistryDataMap;
+
+        @Override
+        public void run() {
+            // query the full data from database, and update the 
mysqlRegistryDataMap
+            try {
+                Map<String, MysqlRegistryData> currentMysqlDataMap = 
mysqlOperator.queryAllMysqlRegistryData()
+                        .stream()
+                        .collect(Collectors.toMap(MysqlRegistryData::getKey, 
Function.identity()));
+                // find the different
+                List<MysqlRegistryData> addedData = new ArrayList<>();
+                List<MysqlRegistryData> deletedData = new ArrayList<>();
+                List<MysqlRegistryData> updatedData = new ArrayList<>();
+                for (Map.Entry<String, MysqlRegistryData> entry : 
currentMysqlDataMap.entrySet()) {
+                    MysqlRegistryData newData = entry.getValue();
+                    MysqlRegistryData oldData = 
mysqlRegistryDataMap.get(entry.getKey());
+                    if (oldData == null) {
+                        addedData.add(newData);
+                    } else {
+                        if 
(!entry.getValue().getLastUpdateTime().equals(oldData.getLastUpdateTime())) {
+                            updatedData.add(newData);
+                        }
+                    }
+                }
+                for (Map.Entry<String, MysqlRegistryData> entry : 
mysqlRegistryDataMap.entrySet()) {
+                    if (!currentMysqlDataMap.containsKey(entry.getKey())) {
+                        deletedData.add(entry.getValue());
+                    }
+                }
+                mysqlRegistryDataMap.clear();
+                mysqlRegistryDataMap.putAll(currentMysqlDataMap);
+                // trigger listener
+                for (Map.Entry<String, List<SubscribeListener>> entry : 
dataSubScribeMap.entrySet()) {
+                    String subscribeKey = entry.getKey();
+                    List<SubscribeListener> subscribeListeners = 
entry.getValue();
+                    triggerListener(addedData, subscribeKey, 
subscribeListeners, Event.Type.ADD);
+                    triggerListener(deletedData, subscribeKey, 
subscribeListeners, Event.Type.REMOVE);
+                    triggerListener(updatedData, subscribeKey, 
subscribeListeners, Event.Type.UPDATE);
+                }
+            } catch (Exception e) {
+                LOGGER.error("Query data from mysql registry error");
+            }
+        }
+
+        private void triggerListener(List<MysqlRegistryData> dataList,
+                                     String subscribeKey,
+                                     List<SubscribeListener> 
subscribeListeners,
+                                     Event.Type type) {
+            for (MysqlRegistryData data : dataList) {
+                if (data.getKey().startsWith(subscribeKey)) {
+                    subscribeListeners.forEach(subscribeListener ->
+                            subscribeListener.notify(new Event(data.getKey(), 
data.getKey(), data.getData(), type)));
+                }
+            }
+        }
+
+    }
+}
diff --git 
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/resources/mysql_registry_init.sql
 
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/resources/mysql_registry_init.sql
new file mode 100644
index 0000000000..6fc936fc6e
--- /dev/null
+++ 
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/resources/mysql_registry_init.sql
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+SET FOREIGN_KEY_CHECKS = 0;
+
+DROP TABLE IF EXISTS `t_ds_mysql_registry_data`;
+CREATE TABLE `t_ds_mysql_registry_data`
+(
+    `id`               bigint(11)   NOT NULL AUTO_INCREMENT COMMENT 'primary 
key',
+    `key`              varchar(200) NOT NULL COMMENT 'key, like zookeeper node 
path',
+    `data`             varchar(200) NOT NULL COMMENT 'data, like zookeeper 
node value',
+    `type`             tinyint(4)   NOT NULL COMMENT '1: ephemeral node, 2: 
persistent node',
+    `last_update_time` timestamp    NULL COMMENT 'last update time',
+    `create_time`      timestamp    NULL COMMENT 'create time',
+    PRIMARY KEY (`id`),
+    unique (`key`)
+) ENGINE = InnoDB
+  DEFAULT CHARSET = utf8;
+
+
+DROP TABLE IF EXISTS `t_ds_mysql_registry_lock`;
+CREATE TABLE `t_ds_mysql_registry_lock`
+(
+    `id`               bigint(11)   NOT NULL AUTO_INCREMENT COMMENT 'primary 
key',
+    `key`              varchar(200) NOT NULL COMMENT 'lock path',
+    `lock_owner`       varchar(100) NOT NULL COMMENT 'the lock owner, 
ip_processId',
+    `last_term`        timestamp    NOT NULL COMMENT 'last term time',
+    `last_update_time` timestamp    NULL COMMENT 'last update time',
+    `create_time`      timestamp    NULL COMMENT 'lock create time',
+    PRIMARY KEY (`id`),
+    unique (`key`)
+) ENGINE = InnoDB
+  DEFAULT CHARSET = utf8;
diff --git 
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistry.java
 
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistry.java
index e0f818e665..ba4131401b 100644
--- 
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistry.java
+++ 
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistry.java
@@ -23,8 +23,6 @@ import 
org.apache.dolphinscheduler.registry.api.ConnectionListener;
 import org.apache.dolphinscheduler.registry.api.Event;
 import org.apache.dolphinscheduler.registry.api.Registry;
 import org.apache.dolphinscheduler.registry.api.RegistryException;
-import org.apache.dolphinscheduler.registry.api.RegistryProperties;
-import 
org.apache.dolphinscheduler.registry.api.RegistryProperties.ZookeeperProperties;
 import org.apache.dolphinscheduler.registry.api.SubscribeListener;
 
 import org.apache.curator.framework.CuratorFramework;
@@ -59,14 +57,14 @@ import com.google.common.base.Strings;
 @Component
 @ConditionalOnProperty(prefix = "registry", name = "type", havingValue = 
"zookeeper")
 public final class ZookeeperRegistry implements Registry {
-    private final ZookeeperProperties properties;
+    private final ZookeeperRegistryProperties.ZookeeperProperties properties;
     private final CuratorFramework client;
 
     private final Map<String, TreeCache> treeCacheMap = new 
ConcurrentHashMap<>();
 
     private static final ThreadLocal<Map<String, InterProcessMutex>> 
threadLocalLockMap = new ThreadLocal<>();
 
-    public ZookeeperRegistry(RegistryProperties registryProperties) {
+    public ZookeeperRegistry(ZookeeperRegistryProperties registryProperties) {
         properties = registryProperties.getZookeeper();
 
         final ExponentialBackoffRetry retryPolicy = new 
ExponentialBackoffRetry(
diff --git 
a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryProperties.java
 
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryProperties.java
similarity index 75%
rename from 
dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryProperties.java
rename to 
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryProperties.java
index dbe9ac1046..84026a34f0 100644
--- 
a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryProperties.java
+++ 
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryProperties.java
@@ -1,44 +1,34 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
  *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ *    http://www.apache.org/licenses/LICENSE-2.0
  *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.registry.api;
+package org.apache.dolphinscheduler.plugin.registry.zookeeper;
 
 import java.time.Duration;
 
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.boot.context.properties.ConfigurationProperties;
 import org.springframework.context.annotation.Configuration;
 
 @Configuration
+@ConditionalOnProperty(prefix = "registry", name = "type", havingValue = 
"zookeeper")
 @ConfigurationProperties(prefix = "registry")
-public class RegistryProperties {
-    private Type type;
+public class ZookeeperRegistryProperties {
     private ZookeeperProperties zookeeper = new ZookeeperProperties();
 
-    public Type getType() {
-        return type;
-    }
-
-    public void setType(Type type) {
-        this.type = type;
-    }
-
     public ZookeeperProperties getZookeeper() {
         return zookeeper;
     }
@@ -47,10 +37,6 @@ public class RegistryProperties {
         this.zookeeper = zookeeper;
     }
 
-    public enum Type {
-        ZOOKEEPER
-    }
-
     public static final class ZookeeperProperties {
         private String namespace;
         private String connectString;
diff --git 
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/test/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryTest.java
 
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/test/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryTest.java
index 312153addc..4665f5011a 100644
--- 
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/test/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryTest.java
+++ 
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/test/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryTest.java
@@ -18,7 +18,6 @@
 package org.apache.dolphinscheduler.plugin.registry.zookeeper;
 
 import org.apache.dolphinscheduler.registry.api.Event;
-import org.apache.dolphinscheduler.registry.api.RegistryProperties;
 import org.apache.dolphinscheduler.registry.api.SubscribeListener;
 
 import org.apache.curator.test.TestingServer;
@@ -48,7 +47,7 @@ public class ZookeeperRegistryTest {
     public void before() throws Exception {
         server = new TestingServer(true);
 
-        RegistryProperties p = new RegistryProperties();
+        ZookeeperRegistryProperties p = new ZookeeperRegistryProperties();
         p.getZookeeper().setConnectString(server.getConnectString());
         registry = new ZookeeperRegistry(p);
         registry.start();
diff --git 
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/pom.xml 
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/pom.xml
index 1defc2d294..e81cc076eb 100644
--- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/pom.xml
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/pom.xml
@@ -22,8 +22,8 @@
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
     <parent>
-        <artifactId>dolphinscheduler-registry</artifactId>
         <groupId>org.apache.dolphinscheduler</groupId>
+        <artifactId>dolphinscheduler-registry</artifactId>
         <version>dev-SNAPSHOT</version>
     </parent>
     <artifactId>dolphinscheduler-registry-plugins</artifactId>
@@ -32,5 +32,6 @@
 
     <modules>
         <module>dolphinscheduler-registry-zookeeper</module>
+        <module>dolphinscheduler-registry-mysql</module>
     </modules>
 </project>
diff --git a/dolphinscheduler-registry/pom.xml 
b/dolphinscheduler-registry/pom.xml
index 79a44079f8..89e0565924 100644
--- a/dolphinscheduler-registry/pom.xml
+++ b/dolphinscheduler-registry/pom.xml
@@ -48,5 +48,6 @@
     <modules>
         <module>dolphinscheduler-registry-api</module>
         <module>dolphinscheduler-registry-plugins</module>
+        <module>dolphinscheduler-registry-all</module>
     </modules>
 </project>
diff --git a/dolphinscheduler-worker/pom.xml b/dolphinscheduler-worker/pom.xml
index ee865769fa..9ef5e94cd0 100644
--- a/dolphinscheduler-worker/pom.xml
+++ b/dolphinscheduler-worker/pom.xml
@@ -59,7 +59,7 @@
         </dependency>
         <dependency>
             <groupId>org.apache.dolphinscheduler</groupId>
-            <artifactId>dolphinscheduler-registry-zookeeper</artifactId>
+            <artifactId>dolphinscheduler-registry-all</artifactId>
         </dependency>
 
         <dependency>
diff --git a/pom.xml b/pom.xml
index f8a325ee77..5d493de04c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -131,6 +131,7 @@
         <hibernate.validator.version>6.2.2.Final</hibernate.validator.version>
         <aws.sdk.version>1.12.160</aws.sdk.version>
         <joda-time.version>2.10.13</joda-time.version>
+        <lombok.version>1.18.20</lombok.version>
         <docker.hub>apache</docker.hub>
         <docker.repo>${project.name}</docker.repo>
         <docker.tag>${project.version}</docker.tag>
@@ -273,6 +274,11 @@
                 <artifactId>dolphinscheduler-registry-plugin</artifactId>
                 <version>${project.version}</version>
             </dependency>
+            <dependency>
+                <groupId>org.apache.dolphinscheduler</groupId>
+                <artifactId>dolphinscheduler-registry-all</artifactId>
+                <version>${project.version}</version>
+            </dependency>
             <dependency>
                 <groupId>org.apache.dolphinscheduler</groupId>
                 <artifactId>dolphinscheduler-dao</artifactId>
@@ -385,6 +391,11 @@
                 <artifactId>dolphinscheduler-registry-zookeeper</artifactId>
                 <version>${project.version}</version>
             </dependency>
+            <dependency>
+                <groupId>org.apache.dolphinscheduler</groupId>
+                <artifactId>dolphinscheduler-registry-mysql</artifactId>
+                <version>${project.version}</version>
+            </dependency>
 
             <dependency>
                 <groupId>org.apache.dolphinscheduler</groupId>
@@ -1222,6 +1233,12 @@
             <version>${auto-service.version}</version>
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+            <version>${lombok.version}</version>
+            <scope>provided</scope>
+        </dependency>
         <dependency>
             <groupId>org.powermock</groupId>
             <artifactId>powermock-api-mockito2</artifactId>

Reply via email to