This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 78c5fcc6ac Add mysql registry plugin (#10406)
78c5fcc6ac is described below
commit 78c5fcc6acd025c2ecdc736f8511c334c1e90c96
Author: Wenjun Ruan <[email protected]>
AuthorDate: Mon Jun 13 11:24:42 2022 +0800
Add mysql registry plugin (#10406)
* Add mysql registry plugin
---
dolphinscheduler-api/pom.xml | 2 +-
dolphinscheduler-master/pom.xml | 2 +-
.../{ => dolphinscheduler-registry-all}/pom.xml | 30 +-
.../dolphinscheduler-registry-mysql/README.md | 36 +++
.../{ => dolphinscheduler-registry-mysql}/pom.xml | 36 ++-
.../plugin/registry/mysql/MysqlOperator.java | 326 +++++++++++++++++++++
.../plugin/registry/mysql/MysqlRegistry.java | 177 +++++++++++
.../registry/mysql/MysqlRegistryConstant.java | 31 ++
.../registry/mysql/MysqlRegistryProperties.java | 47 +++
.../plugin/registry/mysql/model/DataType.java | 33 +++
.../registry/mysql/model/MysqlRegistryData.java | 40 +++
.../registry/mysql/model/MysqlRegistryLock.java | 54 ++++
.../registry/mysql/task/EphemeralDateManager.java | 162 ++++++++++
.../registry/mysql/task/RegistryLockManager.java | 137 +++++++++
.../registry/mysql/task/SubscribeDataManager.java | 156 ++++++++++
.../src/main/resources/mysql_registry_init.sql | 47 +++
.../registry/zookeeper/ZookeeperRegistry.java | 6 +-
.../zookeeper/ZookeeperRegistryProperties.java} | 46 +--
.../registry/zookeeper/ZookeeperRegistryTest.java | 3 +-
.../dolphinscheduler-registry-plugins/pom.xml | 3 +-
dolphinscheduler-registry/pom.xml | 1 +
dolphinscheduler-worker/pom.xml | 2 +-
pom.xml | 17 ++
23 files changed, 1326 insertions(+), 68 deletions(-)
diff --git a/dolphinscheduler-api/pom.xml b/dolphinscheduler-api/pom.xml
index 6007ac1d9e..119dbe1b84 100644
--- a/dolphinscheduler-api/pom.xml
+++ b/dolphinscheduler-api/pom.xml
@@ -62,7 +62,7 @@
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
- <artifactId>dolphinscheduler-registry-zookeeper</artifactId>
+ <artifactId>dolphinscheduler-registry-all</artifactId>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
diff --git a/dolphinscheduler-master/pom.xml b/dolphinscheduler-master/pom.xml
index f21d281ccd..fadde209a9 100644
--- a/dolphinscheduler-master/pom.xml
+++ b/dolphinscheduler-master/pom.xml
@@ -49,7 +49,7 @@
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
- <artifactId>dolphinscheduler-registry-zookeeper</artifactId>
+ <artifactId>dolphinscheduler-registry-all</artifactId>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
diff --git a/dolphinscheduler-registry/pom.xml
b/dolphinscheduler-registry/dolphinscheduler-registry-all/pom.xml
similarity index 64%
copy from dolphinscheduler-registry/pom.xml
copy to dolphinscheduler-registry/dolphinscheduler-registry-all/pom.xml
index 79a44079f8..de853f0eed 100644
--- a/dolphinscheduler-registry/pom.xml
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-all/pom.xml
@@ -19,34 +19,24 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
- <artifactId>dolphinscheduler</artifactId>
<groupId>org.apache.dolphinscheduler</groupId>
+ <artifactId>dolphinscheduler-registry</artifactId>
<version>dev-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>dolphinscheduler-registry</artifactId>
- <packaging>pom</packaging>
+
+ <artifactId>dolphinscheduler-registry-all</artifactId>
<dependencies>
<dependency>
- <groupId>org.springframework</groupId>
- <artifactId>spring-context</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-autoconfigure</artifactId>
- <scope>provided</scope>
+ <groupId>org.apache.dolphinscheduler</groupId>
+ <artifactId>dolphinscheduler-registry-zookeeper</artifactId>
+ <version>${project.version}</version>
</dependency>
<dependency>
- <groupId>javax.annotation</groupId>
- <artifactId>javax.annotation-api</artifactId>
- <scope>provided</scope>
+ <groupId>org.apache.dolphinscheduler</groupId>
+ <artifactId>dolphinscheduler-registry-mysql</artifactId>
+ <version>${project.version}</version>
</dependency>
</dependencies>
-
- <modules>
- <module>dolphinscheduler-registry-api</module>
- <module>dolphinscheduler-registry-plugins</module>
- </modules>
-</project>
+</project>
\ No newline at end of file
diff --git
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/README.md
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/README.md
new file mode 100644
index 0000000000..a7a68a3d45
--- /dev/null
+++
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/README.md
@@ -0,0 +1,36 @@
+# Introduction
+
+This module is the mysql registry plugin module, this plugin will use mysql as
the registry center.
+
+# How to use
+
+If you want to set the registry center as mysql, you need to do the below two
steps:
+
+1. Initialize the mysql table
+
+You can directly execute the sql script
`src/main/resources/mysql_registry_init.sql`.
+
+2. Open the config
+
+You need to set the registry properties in master/worker/api's appplication.yml
+
+```yaml
+registry:
+ type: mysql
+ term-refresh-interval: 2s
+ term-expire-times: 3
+ hikari-config:
+ driver-class-name: com.mysql.cj.jdbc.Driver
+ jdbc-url: jdbc:mysql://127.0.0.1:3306/dolphinscheduler
+ username: root
+ password: root
+ maximum-pool-size: 5
+ connection-timeout: 9000
+ idle-timeout: 600000
+```
+
+After do this two steps, you can start your DolphinScheduler cluster, your
cluster will use mysql as registry center to
+store server metadata.
+
+NOTE: You need to add `mysql-connector-java.jar` into DS classpath, since this
plugin will not bundle this driver in distribution.
+You can get the detail about <a
href="https://dolphinscheduler.apache.org/en-us/docs/latest/user_doc/guide/installation/pseudo-cluster.html">Initialize
the Database</a>
\ No newline at end of file
diff --git
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/pom.xml
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/pom.xml
similarity index 58%
copy from dolphinscheduler-registry/dolphinscheduler-registry-plugins/pom.xml
copy to
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/pom.xml
index 1defc2d294..150b058354 100644
--- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/pom.xml
+++
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/pom.xml
@@ -17,20 +17,40 @@
~ specific language governing permissions and limitations
~ under the License.
-->
-
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
- <artifactId>dolphinscheduler-registry</artifactId>
<groupId>org.apache.dolphinscheduler</groupId>
+ <artifactId>dolphinscheduler-registry-plugins</artifactId>
<version>dev-SNAPSHOT</version>
</parent>
- <artifactId>dolphinscheduler-registry-plugins</artifactId>
<modelVersion>4.0.0</modelVersion>
- <packaging>pom</packaging>
- <modules>
- <module>dolphinscheduler-registry-zookeeper</module>
- </modules>
-</project>
+ <artifactId>dolphinscheduler-registry-mysql</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.dolphinscheduler</groupId>
+ <artifactId>dolphinscheduler-registry-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.dolphinscheduler</groupId>
+ <artifactId>dolphinscheduler-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.zaxxer</groupId>
+ <artifactId>HikariCP</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>mysql</groupId>
+ <artifactId>mysql-connector-java</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlOperator.java
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlOperator.java
new file mode 100644
index 0000000000..d5dd507f92
--- /dev/null
+++
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlOperator.java
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.registry.mysql;
+
+import org.apache.dolphinscheduler.plugin.registry.mysql.model.DataType;
+import
org.apache.dolphinscheduler.plugin.registry.mysql.model.MysqlRegistryData;
+import
org.apache.dolphinscheduler.plugin.registry.mysql.model.MysqlRegistryLock;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.sql.Array;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.SQLIntegrityConstraintViolationException;
+import java.sql.Statement;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.zaxxer.hikari.HikariConfig;
+import com.zaxxer.hikari.HikariDataSource;
+
+/**
+ * Used to CRUD from mysql
+ */
+public class MysqlOperator implements AutoCloseable {
+
+ private static final Logger logger =
LoggerFactory.getLogger(MysqlOperator.class);
+
+ private final HikariDataSource dataSource;
+ private final long expireTimeWindow;
+
+ public MysqlOperator(MysqlRegistryProperties registryProperties) {
+ this.expireTimeWindow = registryProperties.getTermExpireTimes() *
registryProperties.getTermRefreshInterval().toMillis();
+
+ HikariConfig hikariConfig = registryProperties.getHikariConfig();
+ hikariConfig.setPoolName("MysqlRegistryDataSourcePool");
+
+ this.dataSource = new HikariDataSource(hikariConfig);
+ }
+
+ public void healthCheck() throws SQLException {
+ String sql = "select 1 from t_ds_mysql_registry_data";
+ try (Connection connection = dataSource.getConnection();
+ PreparedStatement preparedStatement =
connection.prepareStatement(sql);
+ ResultSet resultSet = preparedStatement.executeQuery();) {
+ // if no exception, the healthCheck success
+ }
+ }
+
+ public List<MysqlRegistryData> queryAllMysqlRegistryData() throws
SQLException {
+ String sql = "select id, `key`, data, type, create_time,
last_update_time from t_ds_mysql_registry_data";
+ try (Connection connection = dataSource.getConnection();
+ PreparedStatement preparedStatement =
connection.prepareStatement(sql);
+ ResultSet resultSet = preparedStatement.executeQuery()) {
+ List<MysqlRegistryData> result = new
ArrayList<>(resultSet.getFetchSize());
+ while (resultSet.next()) {
+ MysqlRegistryData mysqlRegistryData =
MysqlRegistryData.builder()
+ .id(resultSet.getLong("id"))
+ .key(resultSet.getString("key"))
+ .data(resultSet.getString("data"))
+ .type(resultSet.getInt("type"))
+ .createTime(resultSet.getTimestamp("create_time"))
+
.lastUpdateTime(resultSet.getTimestamp("last_update_time"))
+ .build();
+ result.add(mysqlRegistryData);
+ }
+ return result;
+ }
+ }
+
+ public long insertOrUpdateEphemeralData(String key, String value) throws
SQLException {
+ String sql = "INSERT INTO t_ds_mysql_registry_data (`key`, data, type,
create_time, last_update_time) VALUES (?, ?, ?, current_timestamp,
current_timestamp)" +
+ "ON DUPLICATE KEY UPDATE data=?,
last_update_time=current_timestamp";
+ // put a ephemeralData
+ try (Connection connection = dataSource.getConnection();
+ PreparedStatement preparedStatement =
connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) {
+ preparedStatement.setString(1, key);
+ preparedStatement.setString(2, value);
+ preparedStatement.setInt(3, DataType.EPHEMERAL.getTypeValue());
+ preparedStatement.setString(4, value);
+ int insertCount = preparedStatement.executeUpdate();
+ ResultSet generatedKeys = preparedStatement.getGeneratedKeys();
+ if (insertCount < 1 || !generatedKeys.next()) {
+ throw new SQLException("Insert or update ephemeral data
error");
+ }
+ return generatedKeys.getLong(1);
+ }
+ }
+
+ public long insertOrUpdatePersistentData(String key, String value) throws
SQLException {
+ String sql = "INSERT INTO t_ds_mysql_registry_data (`key`, data, type,
create_time, last_update_time) VALUES (?, ?, ?, current_timestamp,
current_timestamp)" +
+ "ON DUPLICATE KEY UPDATE data=?,
last_update_time=current_timestamp";
+ // put a persistent Data
+ try (Connection connection = dataSource.getConnection();
+ PreparedStatement preparedStatement =
connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) {
+ preparedStatement.setString(1, key);
+ preparedStatement.setString(2, value);
+ preparedStatement.setInt(3, DataType.PERSISTENT.getTypeValue());
+ preparedStatement.setString(4, value);
+ int insertCount = preparedStatement.executeUpdate();
+ ResultSet generatedKeys = preparedStatement.getGeneratedKeys();
+ if (insertCount < 1 || !generatedKeys.next()) {
+ throw new SQLException("Insert or update persistent data
error");
+ }
+ return generatedKeys.getLong(1);
+ }
+ }
+
+ public void deleteEphemeralData(String key) throws SQLException {
+ String sql = "DELETE from t_ds_mysql_registry_data where `key` = ? and
type = ?";
+ try (Connection connection = dataSource.getConnection();
+ PreparedStatement preparedStatement =
connection.prepareStatement(sql)) {
+ preparedStatement.setString(1, key);
+ preparedStatement.setInt(2, DataType.EPHEMERAL.getTypeValue());
+ preparedStatement.execute();
+ }
+ }
+
+ public void deleteEphemeralData(long ephemeralNodeId) throws SQLException {
+ String sql = "DELETE from t_ds_mysql_registry_data where `id` = ?";
+ try (Connection connection = dataSource.getConnection();
+ PreparedStatement preparedStatement =
connection.prepareStatement(sql)) {
+ preparedStatement.setLong(1, ephemeralNodeId);
+ preparedStatement.execute();
+ }
+ }
+
+ public void deletePersistentData(String key) throws SQLException {
+ String sql = "DELETE from t_ds_mysql_registry_data where `key` = ? and
type = ?";
+ try (Connection connection = dataSource.getConnection();
+ PreparedStatement preparedStatement =
connection.prepareStatement(sql)) {
+ preparedStatement.setString(1, key);
+ preparedStatement.setInt(2, DataType.PERSISTENT.getTypeValue());
+ preparedStatement.execute();
+ }
+ }
+
+ public void clearExpireLock() {
+ String sql = "delete from t_ds_mysql_registry_lock where last_term <
?";
+ try (Connection connection = dataSource.getConnection();
+ PreparedStatement preparedStatement =
connection.prepareStatement(sql)) {
+ preparedStatement.setTimestamp(1,
+ new Timestamp(System.currentTimeMillis() -
expireTimeWindow));
+ int i = preparedStatement.executeUpdate();
+ if (i > 0) {
+ logger.info("Clear expire lock, size: {}", i);
+ }
+ } catch (Exception ex) {
+ logger.warn("Clear expire lock from mysql registry error", ex);
+ }
+ }
+
+ public void clearExpireEphemeralDate() {
+ String sql = "delete from t_ds_mysql_registry_data where
last_update_time < ? and type = ?";
+ try (Connection connection = dataSource.getConnection();
+ PreparedStatement preparedStatement =
connection.prepareStatement(sql)) {
+ preparedStatement.setTimestamp(1, new
Timestamp(System.currentTimeMillis() - expireTimeWindow));
+ preparedStatement.setInt(2, DataType.EPHEMERAL.getTypeValue());
+ int i = preparedStatement.executeUpdate();
+ if (i > 0) {
+ logger.info("clear expire ephemeral data, size:{}", i);
+ }
+ } catch (Exception ex) {
+ logger.warn("Clear expire ephemeral data from mysql registry
error", ex);
+ }
+ }
+
+ public MysqlRegistryData getData(String key) throws SQLException {
+ String sql = "SELECT id, `key`, data, type, create_time,
last_update_time FROM t_ds_mysql_registry_data WHERE `key` = ?";
+ try (Connection connection = dataSource.getConnection();
+ PreparedStatement preparedStatement =
connection.prepareStatement(sql)) {
+ preparedStatement.setString(1, key);
+ try (ResultSet resultSet = preparedStatement.executeQuery()) {
+ if (!resultSet.next()) {
+ return null;
+ }
+ return MysqlRegistryData.builder()
+ .id(resultSet.getLong("id"))
+ .key(resultSet.getString("key"))
+ .data(resultSet.getString("data"))
+ .type(resultSet.getInt("type"))
+ .createTime(resultSet.getTimestamp("create_time"))
+
.lastUpdateTime(resultSet.getTimestamp("last_update_time"))
+ .build();
+ }
+ }
+ }
+
+ public List<String> getChildren(String key) throws SQLException {
+ String sql = "SELECT `key` from t_ds_mysql_registry_data where `key`
like ?";
+ try (Connection connection = dataSource.getConnection();
+ PreparedStatement preparedStatement =
connection.prepareStatement(sql)) {
+ preparedStatement.setString(1, key + "%");
+ try (ResultSet resultSet = preparedStatement.executeQuery()) {
+ List<String> result = new
ArrayList<>(resultSet.getFetchSize());
+ while (resultSet.next()) {
+ String fullPath = resultSet.getString("key");
+ if (fullPath.length() > key.length()) {
+
result.add(StringUtils.substringBefore(fullPath.substring(key.length() + 1),
"/"));
+ }
+ }
+ return result;
+ }
+ }
+ }
+
+ public boolean existKey(String key) throws SQLException {
+ String sql = "SELECT 1 FROM t_ds_mysql_registry_data WHERE `key` = ?";
+ try (Connection connection = dataSource.getConnection();
+ PreparedStatement preparedStatement =
connection.prepareStatement(sql)) {
+ preparedStatement.setString(1, key);
+ try (ResultSet resultSet = preparedStatement.executeQuery()) {
+ if (resultSet.next()) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Try to acquire the target Lock, if cannot acquire, return null.
+ */
+ public MysqlRegistryLock tryToAcquireLock(String key) throws SQLException {
+ String sql = "INSERT INTO t_ds_mysql_registry_lock (`key`, lock_owner,
last_term, last_update_time, create_time) VALUES (?, ?, current_timestamp,
current_timestamp, current_timestamp)";
+ try (Connection connection = dataSource.getConnection();
+ PreparedStatement preparedStatement =
connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) {
+ preparedStatement.setString(1, key);
+ preparedStatement.setString(2, MysqlRegistryConstant.LOCK_OWNER);
+ preparedStatement.executeUpdate();
+ try (ResultSet resultSet = preparedStatement.getGeneratedKeys()) {
+ if (resultSet.next()) {
+ long newLockId = resultSet.getLong(1);
+ return getLockById(newLockId);
+ }
+ }
+ return null;
+ } catch (SQLIntegrityConstraintViolationException e) {
+ // duplicate exception
+ return null;
+ }
+ }
+
+ public MysqlRegistryLock getLockById(long lockId) throws SQLException {
+ String sql = "SELECT `id`, `key`, lock_owner, last_term,
last_update_time, create_time FROM t_ds_mysql_registry_lock WHERE id = ?";
+ try (Connection connection = dataSource.getConnection();
+ PreparedStatement preparedStatement =
connection.prepareStatement(sql)) {
+ preparedStatement.setLong(1, lockId);
+ try (ResultSet resultSet = preparedStatement.executeQuery()) {
+ if (resultSet.next()) {
+ return MysqlRegistryLock.builder()
+ .id(resultSet.getLong("id"))
+ .key(resultSet.getString("key"))
+ .lockOwner(resultSet.getString("lock_owner"))
+ .lastTerm(resultSet.getTimestamp("last_term"))
+
.lastUpdateTime(resultSet.getTimestamp("last_update_time"))
+ .createTime(resultSet.getTimestamp("create_time"))
+ .build();
+ }
+ }
+ return null;
+ }
+ }
+
+ // release the lock
+ public boolean releaseLock(long lockId) throws SQLException {
+ String sql = "DELETE FROM t_ds_mysql_registry_lock WHERE id = ?";
+ try (Connection connection = dataSource.getConnection();
+ PreparedStatement preparedStatement =
connection.prepareStatement(sql)) {
+ preparedStatement.setLong(1, lockId);
+ int i = preparedStatement.executeUpdate();
+ return i > 0;
+ }
+ }
+
+ public boolean updateEphemeralDataTerm(Collection<Long> ephemeralDateIds)
throws SQLException {
+ String sql = "update t_ds_mysql_registry_data set `last_update_time` =
current_timestamp() where `id` IN (?)";
+ try (Connection connection = dataSource.getConnection();
+ PreparedStatement preparedStatement =
connection.prepareStatement(sql)) {
+ Array idArray = connection.createArrayOf("bigint",
ephemeralDateIds.toArray());
+ preparedStatement.setArray(1, idArray);
+ return preparedStatement.executeUpdate() > 0;
+ }
+ }
+
+ public boolean updateLockTerm(List<Long> lockIds) throws SQLException {
+ String sql = "update t_ds_mysql_registry_lock set `last_term` =
current_timestamp and `last_update_time` = current_timestamp where `id` IN (?)";
+ try (Connection connection = dataSource.getConnection();
+ PreparedStatement preparedStatement =
connection.prepareStatement(sql)) {
+ Array idArray = connection.createArrayOf("bigint",
lockIds.toArray());
+ preparedStatement.setArray(1, idArray);
+ return preparedStatement.executeUpdate() > 0;
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (!dataSource.isClosed()) {
+ try (HikariDataSource closedDatasource = this.dataSource) {
+
+ }
+ }
+ }
+}
diff --git
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlRegistry.java
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlRegistry.java
new file mode 100644
index 0000000000..fa81471898
--- /dev/null
+++
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlRegistry.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.registry.mysql;
+
+import
org.apache.dolphinscheduler.plugin.registry.mysql.task.EphemeralDateManager;
+import
org.apache.dolphinscheduler.plugin.registry.mysql.task.RegistryLockManager;
+import
org.apache.dolphinscheduler.plugin.registry.mysql.task.SubscribeDataManager;
+import org.apache.dolphinscheduler.registry.api.ConnectionListener;
+import org.apache.dolphinscheduler.registry.api.Registry;
+import org.apache.dolphinscheduler.registry.api.RegistryException;
+import org.apache.dolphinscheduler.registry.api.SubscribeListener;
+
+import java.sql.SQLException;
+import java.time.Duration;
+import java.util.Collection;
+
+import javax.annotation.PostConstruct;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.stereotype.Component;
+
+
+/**
+ * This is one of the implementation of {@link Registry}, with this
implementation, you need to rely on mysql database to
+ * store the DolphinScheduler master/worker's metadata and do the server
registry/unRegistry.
+ */
+@Component
+@ConditionalOnProperty(prefix = "registry", name = "type", havingValue =
"mysql")
+public class MysqlRegistry implements Registry {
+
+ private static Logger LOGGER =
LoggerFactory.getLogger(MysqlRegistry.class);
+
+ private final EphemeralDateManager ephemeralDateManager;
+ private final SubscribeDataManager subscribeDataManager;
+ private final RegistryLockManager registryLockManager;
+ private final MysqlOperator mysqlOperator;
+
+ public MysqlRegistry(MysqlRegistryProperties mysqlRegistryProperties) {
+ this.mysqlOperator = new MysqlOperator(mysqlRegistryProperties);
+ mysqlOperator.clearExpireLock();
+ mysqlOperator.clearExpireEphemeralDate();
+ this.ephemeralDateManager = new
EphemeralDateManager(mysqlRegistryProperties, mysqlOperator);
+ this.subscribeDataManager = new
SubscribeDataManager(mysqlRegistryProperties, mysqlOperator);
+ this.registryLockManager = new
RegistryLockManager(mysqlRegistryProperties, mysqlOperator);
+ LOGGER.info("Initialize Mysql Registry...");
+ }
+
+ @PostConstruct
+ public void start() {
+ LOGGER.info("Starting Mysql Registry...");
+ // start a mysql connect check
+ ephemeralDateManager.start();
+ subscribeDataManager.start();
+ registryLockManager.start();
+ LOGGER.info("Started Mysql Registry...");
+ }
+
+ @Override
+ public boolean subscribe(String path, SubscribeListener listener) {
+ // new a schedule thread to query the path, if the path
+ subscribeDataManager.addListener(path, listener);
+ return true;
+ }
+
+ @Override
+ public void unsubscribe(String path) {
+ subscribeDataManager.removeListener(path);
+ }
+
+ @Override
+ public void addConnectionStateListener(ConnectionListener listener) {
+ // check the current connection
+ ephemeralDateManager.addConnectionListener(listener);
+ }
+
+ @Override
+ public String get(String key) {
+ // get the key value
+ return subscribeDataManager.getData(key);
+ }
+
+ @Override
+ public void put(String key, String value, boolean deleteOnDisconnect) {
+ try {
+ if (deleteOnDisconnect) {
+ // when put a ephemeralData will new a scheduler thread to
update it
+ ephemeralDateManager.insertOrUpdateEphemeralData(key, value);
+ } else {
+ mysqlOperator.insertOrUpdatePersistentData(key, value);
+ }
+ } catch (Exception ex) {
+ throw new RegistryException(String.format("put key:%s, value:%s
error", key, value), ex);
+ }
+ }
+
+ @Override
+ public void delete(String key) {
+ try {
+ mysqlOperator.deleteEphemeralData(key);
+ mysqlOperator.deletePersistentData(key);
+ } catch (Exception e) {
+ throw new RegistryException(String.format("Delete key: %s error",
key), e);
+ }
+ }
+
+ @Override
+ public Collection<String> children(String key) {
+ try {
+ return mysqlOperator.getChildren(key);
+ } catch (SQLException e) {
+ throw new RegistryException(String.format("Get key: %s children
error", key), e);
+ }
+ }
+
+ @Override
+ public boolean exists(String key) {
+ try {
+ return mysqlOperator.existKey(key);
+ } catch (Exception e) {
+ throw new RegistryException(String.format("Check key: %s exist
error", key), e);
+ }
+ }
+
+ @Override
+ public boolean acquireLock(String key) {
+ try {
+ registryLockManager.acquireLock(key);
+ return true;
+ } catch (RegistryException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new RegistryException(String.format("Acquire lock: %s
error", key), e);
+ }
+ }
+
+ @Override
+ public boolean releaseLock(String key) {
+ registryLockManager.releaseLock(key);
+ return true;
+ }
+
+ @Override
+ public Duration getSessionTimeout() {
+ throw new UnsupportedOperationException("Not support session timeout
at Mysql Registry");
+ }
+
+ @Override
+ public void close() {
+ LOGGER.info("Closing Mysql Registry...");
+ // remove the current Ephemeral node, if can connect to mysql
+ try (EphemeralDateManager closed1 = ephemeralDateManager;
+ SubscribeDataManager close2 = subscribeDataManager;
+ RegistryLockManager close3 = registryLockManager;
+ MysqlOperator closed4 = mysqlOperator) {
+ } catch (Exception e) {
+ LOGGER.error("Close Mysql Registry error", e);
+ }
+ LOGGER.info("Closed Mysql Registry...");
+ }
+}
diff --git
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlRegistryConstant.java
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlRegistryConstant.java
new file mode 100644
index 0000000000..1482560b7c
--- /dev/null
+++
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlRegistryConstant.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.registry.mysql;
+
+import org.apache.dolphinscheduler.common.utils.NetUtils;
+import org.apache.dolphinscheduler.common.utils.OSUtils;
+
+import lombok.experimental.UtilityClass;
+
+@UtilityClass
+public final class MysqlRegistryConstant {
+
+ public static final long LOCK_ACQUIRE_INTERVAL = 1_000;
+
+ public static final String LOCK_OWNER = NetUtils.getHost() + "_" +
OSUtils.getProcessID();
+}
diff --git
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlRegistryProperties.java
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlRegistryProperties.java
new file mode 100644
index 0000000000..d80b8ead0d
--- /dev/null
+++
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlRegistryProperties.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.registry.mysql;
+
+import java.time.Duration;
+
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.Configuration;
+
+import com.zaxxer.hikari.HikariConfig;
+
+import lombok.Data;
+
+@Data
+@Configuration
+@ConditionalOnProperty(prefix = "registry", name = "type", havingValue =
"mysql")
+@ConfigurationProperties(prefix = "registry")
+public class MysqlRegistryProperties {
+
+ /**
+ * Used to schedule refresh the ephemeral data/ lock.
+ */
+ private Duration termRefreshInterval = Duration.ofSeconds(2);
+ /**
+ * Used to calculate the expire time,
+ * e.g. if you set 2, and latest two refresh error, then the ephemeral
data/lock will be expire.
+ */
+ private int termExpireTimes = 3;
+ private HikariConfig hikariConfig;
+
+}
diff --git
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/DataType.java
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/DataType.java
new file mode 100644
index 0000000000..657c80b716
--- /dev/null
+++
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/DataType.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.registry.mysql.model;
+
+public enum DataType {
+ EPHEMERAL(1),
+ PERSISTENT(2),
+ ;
+ private final int typeValue;
+
+ DataType(int typeValue) {
+ this.typeValue = typeValue;
+ }
+
+ public int getTypeValue() {
+ return typeValue;
+ }
+}
diff --git
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/MysqlRegistryData.java
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/MysqlRegistryData.java
new file mode 100644
index 0000000000..e3045881c5
--- /dev/null
+++
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/MysqlRegistryData.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.registry.mysql.model;
+
+import java.util.Date;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class MysqlRegistryData {
+
+ private long id;
+ private String key;
+ private String data;
+ private int type;
+ private Date createTime;
+ private Date lastUpdateTime;
+
+}
diff --git
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/MysqlRegistryLock.java
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/MysqlRegistryLock.java
new file mode 100644
index 0000000000..79d718cd4f
--- /dev/null
+++
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/MysqlRegistryLock.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.registry.mysql.model;
+
+import java.util.Date;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class MysqlRegistryLock {
+
+ private long id;
+ /**
+ * The lock key.
+ */
+ private String key;
+ /**
+ * acquire lock host.
+ */
+ private String lockOwner;
+ /**
+ * The last term, if the (currentTime - lastTerm) > termExpire time, the
lock will be expired.
+ */
+ private Date lastTerm;
+ /**
+ * The lock last update time.
+ */
+ private Date lastUpdateTime;
+ /**
+ * The lock create time.
+ */
+ private Date createTime;
+}
diff --git
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/task/EphemeralDateManager.java
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/task/EphemeralDateManager.java
new file mode 100644
index 0000000000..7e8bc53882
--- /dev/null
+++
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/task/EphemeralDateManager.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.registry.mysql.task;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.dolphinscheduler.plugin.registry.mysql.MysqlOperator;
+import
org.apache.dolphinscheduler.plugin.registry.mysql.MysqlRegistryProperties;
+import org.apache.dolphinscheduler.registry.api.ConnectionListener;
+import org.apache.dolphinscheduler.registry.api.ConnectionState;
+
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * This thread is used to check the connect state to mysql.
+ */
+public class EphemeralDateManager implements AutoCloseable {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(EphemeralDateManager.class);
+
+ private final MysqlOperator mysqlOperator;
+ private final MysqlRegistryProperties registryProperties;
+ private final List<ConnectionListener> connectionListeners =
Collections.synchronizedList(new ArrayList<>());
+ private final Set<Long> ephemeralDateIds = Collections.synchronizedSet(new
HashSet<>());
+ private final ScheduledExecutorService scheduledExecutorService;
+
+ public EphemeralDateManager(MysqlRegistryProperties registryProperties,
MysqlOperator mysqlOperator) {
+ this.registryProperties = registryProperties;
+ this.mysqlOperator = checkNotNull(mysqlOperator);
+ this.scheduledExecutorService = Executors.newScheduledThreadPool(
+ 1,
+ new
ThreadFactoryBuilder().setNameFormat("EphemeralDateTermRefreshThread").setDaemon(true).build());
+ }
+
+ public void start() {
+ this.scheduledExecutorService.scheduleWithFixedDelay(
+ new EphemeralDateTermRefreshTask(mysqlOperator,
connectionListeners, ephemeralDateIds),
+ registryProperties.getTermRefreshInterval().toMillis(),
+ registryProperties.getTermRefreshInterval().toMillis(),
+ TimeUnit.MILLISECONDS);
+ }
+
+ public void addConnectionListener(ConnectionListener connectionListener) {
+ connectionListeners.add(connectionListener);
+ }
+
+ public long insertOrUpdateEphemeralData(String key, String value) throws
SQLException {
+ long ephemeralId = mysqlOperator.insertOrUpdateEphemeralData(key,
value);
+ ephemeralDateIds.add(ephemeralId);
+ return ephemeralId;
+ }
+
+ @Override
+ public void close() throws SQLException {
+ ephemeralDateIds.clear();
+ connectionListeners.clear();
+ scheduledExecutorService.shutdownNow();
+ for (Long ephemeralDateId : ephemeralDateIds) {
+ mysqlOperator.deleteEphemeralData(ephemeralDateId);
+ }
+ }
+
+ // Use this task to refresh ephemeral term and check the connect state.
+ static class EphemeralDateTermRefreshTask implements Runnable {
+ private final List<ConnectionListener> connectionListeners;
+ private final Set<Long> ephemeralDateIds;
+ private final MysqlOperator mysqlOperator;
+ private ConnectionState connectionState;
+
+ private EphemeralDateTermRefreshTask(MysqlOperator mysqlOperator,
+ List<ConnectionListener>
connectionListeners,
+ Set<Long> ephemeralDateIds) {
+ this.mysqlOperator = checkNotNull(mysqlOperator);
+ this.connectionListeners = checkNotNull(connectionListeners);
+ this.ephemeralDateIds = checkNotNull(ephemeralDateIds);
+ }
+
+ @Override
+ public void run() {
+ try {
+ ConnectionState currentConnectionState = getConnectionState();
+ if (currentConnectionState == connectionState) {
+ // no state change
+ return;
+ }
+
+ if (connectionState == ConnectionState.CONNECTED) {
+ if (currentConnectionState ==
ConnectionState.DISCONNECTED) {
+ connectionState = ConnectionState.DISCONNECTED;
+ triggerListener(ConnectionState.DISCONNECTED);
+ }
+ } else if (connectionState == ConnectionState.DISCONNECTED) {
+ if (currentConnectionState == ConnectionState.CONNECTED) {
+ connectionState = ConnectionState.CONNECTED;
+ triggerListener(ConnectionState.RECONNECTED);
+ }
+ } else if (connectionState == null) {
+ connectionState = currentConnectionState;
+ triggerListener(connectionState);
+ }
+ } catch (Exception e) {
+ LOGGER.error("Mysql Registry connect state check task execute
failed", e);
+ connectionState = ConnectionState.DISCONNECTED;
+ triggerListener(ConnectionState.DISCONNECTED);
+ }
+ }
+
+ private ConnectionState getConnectionState() {
+ try {
+ if (ephemeralDateIds.isEmpty()) {
+ mysqlOperator.healthCheck();
+ } else {
+ updateEphemeralDateTerm();
+ }
+ mysqlOperator.clearExpireEphemeralDate();
+ return ConnectionState.CONNECTED;
+ } catch (Exception ex) {
+ return ConnectionState.DISCONNECTED;
+ }
+ }
+
+ private void updateEphemeralDateTerm() throws SQLException {
+ if (!mysqlOperator.updateEphemeralDataTerm(ephemeralDateIds)) {
+ LOGGER.warn("Update mysql registry ephemeral data: {} term
error", ephemeralDateIds);
+ }
+ }
+
+ private void triggerListener(ConnectionState connectionState) {
+ for (ConnectionListener connectionListener : connectionListeners) {
+ connectionListener.onUpdate(connectionState);
+ }
+ }
+ }
+}
diff --git
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/task/RegistryLockManager.java
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/task/RegistryLockManager.java
new file mode 100644
index 0000000000..478727545b
--- /dev/null
+++
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/task/RegistryLockManager.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.registry.mysql.task;
+
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
+import org.apache.dolphinscheduler.plugin.registry.mysql.MysqlOperator;
+import org.apache.dolphinscheduler.plugin.registry.mysql.MysqlRegistryConstant;
+import
org.apache.dolphinscheduler.plugin.registry.mysql.MysqlRegistryProperties;
+import
org.apache.dolphinscheduler.plugin.registry.mysql.model.MysqlRegistryLock;
+import org.apache.dolphinscheduler.registry.api.RegistryException;
+
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import lombok.AccessLevel;
+import lombok.RequiredArgsConstructor;
+
+public class RegistryLockManager implements AutoCloseable {
+
+ private static final Logger logger =
LoggerFactory.getLogger(RegistryLockManager.class);
+
+ private final MysqlOperator mysqlOperator;
+ private final MysqlRegistryProperties registryProperties;
+ private final Map<String, MysqlRegistryLock> lockHoldMap;
+ private final ScheduledExecutorService lockTermUpdateThreadPool;
+
+ public RegistryLockManager(MysqlRegistryProperties registryProperties,
MysqlOperator mysqlOperator) {
+ this.registryProperties = registryProperties;
+ this.mysqlOperator = mysqlOperator;
+ this.lockHoldMap = new ConcurrentHashMap<>();
+ this.lockTermUpdateThreadPool =
Executors.newSingleThreadScheduledExecutor(
+ new
ThreadFactoryBuilder().setNameFormat("MysqlRegistryLockTermRefreshThread").setDaemon(true).build());
+ }
+
+ public void start() {
+ lockTermUpdateThreadPool.scheduleWithFixedDelay(
+ new LockTermRefreshTask(lockHoldMap, mysqlOperator),
+ registryProperties.getTermRefreshInterval().toMillis(),
+ registryProperties.getTermRefreshInterval().toMillis(),
+ TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Acquire the lock, if cannot get the lock will await.
+ */
+ public void acquireLock(String lockKey) throws RegistryException {
+ // maybe we can use the computeIf absent
+ lockHoldMap.computeIfAbsent(lockKey, key -> {
+ MysqlRegistryLock mysqlRegistryLock;
+ try {
+ while ((mysqlRegistryLock =
mysqlOperator.tryToAcquireLock(lockKey)) == null) {
+ logger.debug("Acquire the lock {} failed try again", key);
+ // acquire failed, wait and try again
+
ThreadUtils.sleep(MysqlRegistryConstant.LOCK_ACQUIRE_INTERVAL);
+ }
+ } catch (SQLException e) {
+ throw new RegistryException("Acquire the lock error", e);
+ }
+ return mysqlRegistryLock;
+ });
+ }
+
+ public void releaseLock(String lockKey) {
+ MysqlRegistryLock mysqlRegistryLock = lockHoldMap.get(lockKey);
+ if (mysqlRegistryLock != null) {
+ try {
+ // the lock is unExit
+ mysqlOperator.releaseLock(mysqlRegistryLock.getId());
+ lockHoldMap.remove(lockKey);
+ } catch (SQLException e) {
+ throw new RegistryException(String.format("Release lock: %s
error", lockKey), e);
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ lockTermUpdateThreadPool.shutdownNow();
+ for (Map.Entry<String, MysqlRegistryLock> lockEntry :
lockHoldMap.entrySet()) {
+ releaseLock(lockEntry.getKey());
+ }
+ }
+
+ /**
+ * This task is used to refresh the lock held by the current server.
+ */
+ @RequiredArgsConstructor(access = AccessLevel.PRIVATE)
+ static class LockTermRefreshTask implements Runnable {
+ private final Map<String, MysqlRegistryLock> lockHoldMap;
+ private final MysqlOperator mysqlOperator;
+
+ public void run() {
+ try {
+ if (lockHoldMap.isEmpty()) {
+ return;
+ }
+ List<Long> lockIds = lockHoldMap.values()
+ .stream()
+ .map(MysqlRegistryLock::getId)
+ .collect(Collectors.toList());
+ if (!mysqlOperator.updateLockTerm(lockIds)) {
+ logger.warn("Update the lock: {} term failed.", lockIds);
+ }
+ mysqlOperator.clearExpireLock();
+ } catch (Exception e) {
+ logger.error("Update lock term error", e);
+ }
+ }
+ }
+}
+
diff --git
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/task/SubscribeDataManager.java
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/task/SubscribeDataManager.java
new file mode 100644
index 0000000000..4c8d7190f3
--- /dev/null
+++
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/task/SubscribeDataManager.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.registry.mysql.task;
+
+import org.apache.dolphinscheduler.plugin.registry.mysql.MysqlOperator;
+import
org.apache.dolphinscheduler.plugin.registry.mysql.MysqlRegistryProperties;
+import
org.apache.dolphinscheduler.plugin.registry.mysql.model.MysqlRegistryData;
+import org.apache.dolphinscheduler.registry.api.Event;
+import org.apache.dolphinscheduler.registry.api.SubscribeListener;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import lombok.RequiredArgsConstructor;
+
+/**
+ * Used to refresh if the subscribe path has been changed.
+ */
+public class SubscribeDataManager implements AutoCloseable {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SubscribeDataManager.class);
+
+ private final MysqlOperator mysqlOperator;
+ private final MysqlRegistryProperties registryProperties;
+ private final Map<String, List<SubscribeListener>> dataSubScribeMap = new
ConcurrentHashMap<>();
+ private final ScheduledExecutorService dataSubscribeCheckThreadPool;
+ private final Map<String, MysqlRegistryData> mysqlRegistryDataMap = new
ConcurrentHashMap<>();
+
+ public SubscribeDataManager(MysqlRegistryProperties registryProperties,
MysqlOperator mysqlOperator) {
+ this.registryProperties = registryProperties;
+ this.mysqlOperator = mysqlOperator;
+ this.dataSubscribeCheckThreadPool = Executors.newScheduledThreadPool(
+ 1,
+ new
ThreadFactoryBuilder().setNameFormat("MysqlRegistrySubscribeDataCheckThread").setDaemon(true).build());
+ }
+
+ public void start() {
+ dataSubscribeCheckThreadPool.scheduleWithFixedDelay(
+ new RegistrySubscribeDataCheckTask(dataSubScribeMap,
mysqlOperator, mysqlRegistryDataMap),
+ registryProperties.getTermRefreshInterval().toMillis(),
+ registryProperties.getTermRefreshInterval().toMillis(),
+ TimeUnit.MILLISECONDS);
+ }
+
+ public void addListener(String path, SubscribeListener subscribeListener) {
+ dataSubScribeMap.computeIfAbsent(path, k -> new
ArrayList<>()).add(subscribeListener);
+ }
+
+ public void removeListener(String path) {
+ dataSubScribeMap.remove(path);
+ }
+
+ public String getData(String path) {
+ MysqlRegistryData mysqlRegistryData = mysqlRegistryDataMap.get(path);
+ if (mysqlRegistryData == null) {
+ return null;
+ }
+ return mysqlRegistryData.getData();
+ }
+
+ @Override
+ public void close() {
+ dataSubscribeCheckThreadPool.shutdownNow();
+ dataSubScribeMap.clear();
+ }
+
+ @RequiredArgsConstructor
+ static class RegistrySubscribeDataCheckTask implements Runnable {
+
+ private final Map<String, List<SubscribeListener>> dataSubScribeMap;
+ private final MysqlOperator mysqlOperator;
+ private final Map<String, MysqlRegistryData> mysqlRegistryDataMap;
+
+ @Override
+ public void run() {
+ // query the full data from database, and update the
mysqlRegistryDataMap
+ try {
+ Map<String, MysqlRegistryData> currentMysqlDataMap =
mysqlOperator.queryAllMysqlRegistryData()
+ .stream()
+ .collect(Collectors.toMap(MysqlRegistryData::getKey,
Function.identity()));
+ // find the different
+ List<MysqlRegistryData> addedData = new ArrayList<>();
+ List<MysqlRegistryData> deletedData = new ArrayList<>();
+ List<MysqlRegistryData> updatedData = new ArrayList<>();
+ for (Map.Entry<String, MysqlRegistryData> entry :
currentMysqlDataMap.entrySet()) {
+ MysqlRegistryData newData = entry.getValue();
+ MysqlRegistryData oldData =
mysqlRegistryDataMap.get(entry.getKey());
+ if (oldData == null) {
+ addedData.add(newData);
+ } else {
+ if
(!entry.getValue().getLastUpdateTime().equals(oldData.getLastUpdateTime())) {
+ updatedData.add(newData);
+ }
+ }
+ }
+ for (Map.Entry<String, MysqlRegistryData> entry :
mysqlRegistryDataMap.entrySet()) {
+ if (!currentMysqlDataMap.containsKey(entry.getKey())) {
+ deletedData.add(entry.getValue());
+ }
+ }
+ mysqlRegistryDataMap.clear();
+ mysqlRegistryDataMap.putAll(currentMysqlDataMap);
+ // trigger listener
+ for (Map.Entry<String, List<SubscribeListener>> entry :
dataSubScribeMap.entrySet()) {
+ String subscribeKey = entry.getKey();
+ List<SubscribeListener> subscribeListeners =
entry.getValue();
+ triggerListener(addedData, subscribeKey,
subscribeListeners, Event.Type.ADD);
+ triggerListener(deletedData, subscribeKey,
subscribeListeners, Event.Type.REMOVE);
+ triggerListener(updatedData, subscribeKey,
subscribeListeners, Event.Type.UPDATE);
+ }
+ } catch (Exception e) {
+ LOGGER.error("Query data from mysql registry error");
+ }
+ }
+
+ private void triggerListener(List<MysqlRegistryData> dataList,
+ String subscribeKey,
+ List<SubscribeListener>
subscribeListeners,
+ Event.Type type) {
+ for (MysqlRegistryData data : dataList) {
+ if (data.getKey().startsWith(subscribeKey)) {
+ subscribeListeners.forEach(subscribeListener ->
+ subscribeListener.notify(new Event(data.getKey(),
data.getKey(), data.getData(), type)));
+ }
+ }
+ }
+
+ }
+}
diff --git
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/resources/mysql_registry_init.sql
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/resources/mysql_registry_init.sql
new file mode 100644
index 0000000000..6fc936fc6e
--- /dev/null
+++
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/resources/mysql_registry_init.sql
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+SET FOREIGN_KEY_CHECKS = 0;
+
+DROP TABLE IF EXISTS `t_ds_mysql_registry_data`;
+CREATE TABLE `t_ds_mysql_registry_data`
+(
+ `id` bigint(11) NOT NULL AUTO_INCREMENT COMMENT 'primary
key',
+ `key` varchar(200) NOT NULL COMMENT 'key, like zookeeper node
path',
+ `data` varchar(200) NOT NULL COMMENT 'data, like zookeeper
node value',
+ `type` tinyint(4) NOT NULL COMMENT '1: ephemeral node, 2:
persistent node',
+ `last_update_time` timestamp NULL COMMENT 'last update time',
+ `create_time` timestamp NULL COMMENT 'create time',
+ PRIMARY KEY (`id`),
+ unique (`key`)
+) ENGINE = InnoDB
+ DEFAULT CHARSET = utf8;
+
+
+DROP TABLE IF EXISTS `t_ds_mysql_registry_lock`;
+CREATE TABLE `t_ds_mysql_registry_lock`
+(
+ `id` bigint(11) NOT NULL AUTO_INCREMENT COMMENT 'primary
key',
+ `key` varchar(200) NOT NULL COMMENT 'lock path',
+ `lock_owner` varchar(100) NOT NULL COMMENT 'the lock owner,
ip_processId',
+ `last_term` timestamp NOT NULL COMMENT 'last term time',
+ `last_update_time` timestamp NULL COMMENT 'last update time',
+ `create_time` timestamp NULL COMMENT 'lock create time',
+ PRIMARY KEY (`id`),
+ unique (`key`)
+) ENGINE = InnoDB
+ DEFAULT CHARSET = utf8;
diff --git
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistry.java
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistry.java
index e0f818e665..ba4131401b 100644
---
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistry.java
+++
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistry.java
@@ -23,8 +23,6 @@ import
org.apache.dolphinscheduler.registry.api.ConnectionListener;
import org.apache.dolphinscheduler.registry.api.Event;
import org.apache.dolphinscheduler.registry.api.Registry;
import org.apache.dolphinscheduler.registry.api.RegistryException;
-import org.apache.dolphinscheduler.registry.api.RegistryProperties;
-import
org.apache.dolphinscheduler.registry.api.RegistryProperties.ZookeeperProperties;
import org.apache.dolphinscheduler.registry.api.SubscribeListener;
import org.apache.curator.framework.CuratorFramework;
@@ -59,14 +57,14 @@ import com.google.common.base.Strings;
@Component
@ConditionalOnProperty(prefix = "registry", name = "type", havingValue =
"zookeeper")
public final class ZookeeperRegistry implements Registry {
- private final ZookeeperProperties properties;
+ private final ZookeeperRegistryProperties.ZookeeperProperties properties;
private final CuratorFramework client;
private final Map<String, TreeCache> treeCacheMap = new
ConcurrentHashMap<>();
private static final ThreadLocal<Map<String, InterProcessMutex>>
threadLocalLockMap = new ThreadLocal<>();
- public ZookeeperRegistry(RegistryProperties registryProperties) {
+ public ZookeeperRegistry(ZookeeperRegistryProperties registryProperties) {
properties = registryProperties.getZookeeper();
final ExponentialBackoffRetry retryPolicy = new
ExponentialBackoffRetry(
diff --git
a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryProperties.java
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryProperties.java
similarity index 75%
rename from
dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryProperties.java
rename to
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryProperties.java
index dbe9ac1046..84026a34f0 100644
---
a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryProperties.java
+++
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryProperties.java
@@ -1,44 +1,34 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * http://www.apache.org/licenses/LICENSE-2.0
*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
-package org.apache.dolphinscheduler.registry.api;
+package org.apache.dolphinscheduler.plugin.registry.zookeeper;
import java.time.Duration;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
@Configuration
+@ConditionalOnProperty(prefix = "registry", name = "type", havingValue =
"zookeeper")
@ConfigurationProperties(prefix = "registry")
-public class RegistryProperties {
- private Type type;
+public class ZookeeperRegistryProperties {
private ZookeeperProperties zookeeper = new ZookeeperProperties();
- public Type getType() {
- return type;
- }
-
- public void setType(Type type) {
- this.type = type;
- }
-
public ZookeeperProperties getZookeeper() {
return zookeeper;
}
@@ -47,10 +37,6 @@ public class RegistryProperties {
this.zookeeper = zookeeper;
}
- public enum Type {
- ZOOKEEPER
- }
-
public static final class ZookeeperProperties {
private String namespace;
private String connectString;
diff --git
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/test/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryTest.java
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/test/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryTest.java
index 312153addc..4665f5011a 100644
---
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/test/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryTest.java
+++
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/test/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryTest.java
@@ -18,7 +18,6 @@
package org.apache.dolphinscheduler.plugin.registry.zookeeper;
import org.apache.dolphinscheduler.registry.api.Event;
-import org.apache.dolphinscheduler.registry.api.RegistryProperties;
import org.apache.dolphinscheduler.registry.api.SubscribeListener;
import org.apache.curator.test.TestingServer;
@@ -48,7 +47,7 @@ public class ZookeeperRegistryTest {
public void before() throws Exception {
server = new TestingServer(true);
- RegistryProperties p = new RegistryProperties();
+ ZookeeperRegistryProperties p = new ZookeeperRegistryProperties();
p.getZookeeper().setConnectString(server.getConnectString());
registry = new ZookeeperRegistry(p);
registry.start();
diff --git
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/pom.xml
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/pom.xml
index 1defc2d294..e81cc076eb 100644
--- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/pom.xml
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/pom.xml
@@ -22,8 +22,8 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
- <artifactId>dolphinscheduler-registry</artifactId>
<groupId>org.apache.dolphinscheduler</groupId>
+ <artifactId>dolphinscheduler-registry</artifactId>
<version>dev-SNAPSHOT</version>
</parent>
<artifactId>dolphinscheduler-registry-plugins</artifactId>
@@ -32,5 +32,6 @@
<modules>
<module>dolphinscheduler-registry-zookeeper</module>
+ <module>dolphinscheduler-registry-mysql</module>
</modules>
</project>
diff --git a/dolphinscheduler-registry/pom.xml
b/dolphinscheduler-registry/pom.xml
index 79a44079f8..89e0565924 100644
--- a/dolphinscheduler-registry/pom.xml
+++ b/dolphinscheduler-registry/pom.xml
@@ -48,5 +48,6 @@
<modules>
<module>dolphinscheduler-registry-api</module>
<module>dolphinscheduler-registry-plugins</module>
+ <module>dolphinscheduler-registry-all</module>
</modules>
</project>
diff --git a/dolphinscheduler-worker/pom.xml b/dolphinscheduler-worker/pom.xml
index ee865769fa..9ef5e94cd0 100644
--- a/dolphinscheduler-worker/pom.xml
+++ b/dolphinscheduler-worker/pom.xml
@@ -59,7 +59,7 @@
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
- <artifactId>dolphinscheduler-registry-zookeeper</artifactId>
+ <artifactId>dolphinscheduler-registry-all</artifactId>
</dependency>
<dependency>
diff --git a/pom.xml b/pom.xml
index f8a325ee77..5d493de04c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -131,6 +131,7 @@
<hibernate.validator.version>6.2.2.Final</hibernate.validator.version>
<aws.sdk.version>1.12.160</aws.sdk.version>
<joda-time.version>2.10.13</joda-time.version>
+ <lombok.version>1.18.20</lombok.version>
<docker.hub>apache</docker.hub>
<docker.repo>${project.name}</docker.repo>
<docker.tag>${project.version}</docker.tag>
@@ -273,6 +274,11 @@
<artifactId>dolphinscheduler-registry-plugin</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.dolphinscheduler</groupId>
+ <artifactId>dolphinscheduler-registry-all</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-dao</artifactId>
@@ -385,6 +391,11 @@
<artifactId>dolphinscheduler-registry-zookeeper</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.dolphinscheduler</groupId>
+ <artifactId>dolphinscheduler-registry-mysql</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
@@ -1222,6 +1233,12 @@
<version>${auto-service.version}</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.projectlombok</groupId>
+ <artifactId>lombok</artifactId>
+ <version>${lombok.version}</version>
+ <scope>provided</scope>
+ </dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-api-mockito2</artifactId>