This is an automated email from the ASF dual-hosted git repository.

kerwin pushed a commit to branch 3.1.3-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git

commit fa78b95a76f80e610b2680b60f91df6b7f54cebb
Author: Wenjun Ruan <[email protected]>
AuthorDate: Sat Dec 3 11:51:01 2022 +0800

    cherry-pick Use bigint represent term in mysql registry to avoid time 
inaccurate #13082
---
 .../plugin/registry/mysql/MysqlOperator.java       | 183 ++++++++++++++++-----
 .../registry/mysql/model/MysqlRegistryData.java    |   1 +
 .../registry/mysql/model/MysqlRegistryLock.java    |   2 +-
 .../src/main/resources/mysql_registry_init.sql     |  25 +--
 4 files changed, 153 insertions(+), 58 deletions(-)

diff --git 
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlOperator.java
 
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlOperator.java
index d5dd507f92..d372263d2b 100644
--- 
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlOperator.java
+++ 
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlOperator.java
@@ -30,10 +30,13 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.SQLIntegrityConstraintViolationException;
 import java.sql.Statement;
-import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Optional;
+
+import lombok.NonNull;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -70,10 +73,12 @@ public class MysqlOperator implements AutoCloseable {
     }
 
     public List<MysqlRegistryData> queryAllMysqlRegistryData() throws 
SQLException {
-        String sql = "select id, `key`, data, type, create_time, 
last_update_time from t_ds_mysql_registry_data";
-        try (Connection connection = dataSource.getConnection();
-             PreparedStatement preparedStatement = 
connection.prepareStatement(sql);
-             ResultSet resultSet = preparedStatement.executeQuery()) {
+        String sql =
+                "select id, `key`, data, type, last_term, create_time, 
last_update_time from t_ds_mysql_registry_data";
+        try (
+                Connection connection = dataSource.getConnection();
+                PreparedStatement preparedStatement = 
connection.prepareStatement(sql);
+                ResultSet resultSet = preparedStatement.executeQuery()) {
             List<MysqlRegistryData> result = new 
ArrayList<>(resultSet.getFetchSize());
             while (resultSet.next()) {
                 MysqlRegistryData mysqlRegistryData = 
MysqlRegistryData.builder()
@@ -81,6 +86,7 @@ public class MysqlOperator implements AutoCloseable {
                         .key(resultSet.getString("key"))
                         .data(resultSet.getString("data"))
                         .type(resultSet.getInt("type"))
+                        .lastTerm(resultSet.getLong("last_term"))
                         .createTime(resultSet.getTimestamp("create_time"))
                         
.lastUpdateTime(resultSet.getTimestamp("last_update_time"))
                         .build();
@@ -90,35 +96,97 @@ public class MysqlOperator implements AutoCloseable {
         }
     }
 
-    public long insertOrUpdateEphemeralData(String key, String value) throws 
SQLException {
-        String sql = "INSERT INTO t_ds_mysql_registry_data (`key`, data, type, 
create_time, last_update_time) VALUES (?, ?, ?, current_timestamp, 
current_timestamp)" +
-                "ON DUPLICATE KEY UPDATE data=?, 
last_update_time=current_timestamp";
-        // put a ephemeralData
-        try (Connection connection = dataSource.getConnection();
-             PreparedStatement preparedStatement = 
connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) {
+    public Long insertOrUpdateEphemeralData(String key, String value) throws 
SQLException {
+        Optional<MysqlRegistryData> mysqlRegistryDataOptional = 
selectByKey(key);
+        if (mysqlRegistryDataOptional.isPresent()) {
+            long id = mysqlRegistryDataOptional.get().getId();
+            if (!updateValueById(id, value)) {
+                throw new SQLException(String.format("update registry value 
failed, key: %s, value: %s", key, value));
+            }
+            return id;
+        }
+        MysqlRegistryData mysqlRegistryData = MysqlRegistryData.builder()
+                .key(key)
+                .data(value)
+                .type(DataType.EPHEMERAL.getTypeValue())
+                .lastTerm(System.currentTimeMillis())
+                .build();
+        return insertMysqlRegistryData(mysqlRegistryData);
+    }
+
+    private Optional<MysqlRegistryData> selectByKey(@NonNull String key) 
throws SQLException {
+        String sql =
+                "select id, `key`, data, type, create_time, last_update_time 
from t_ds_mysql_registry_data where `key` = ?";
+        try (
+                Connection connection = dataSource.getConnection();
+                PreparedStatement preparedStatement = 
connection.prepareStatement(sql)) {
             preparedStatement.setString(1, key);
-            preparedStatement.setString(2, value);
-            preparedStatement.setInt(3, DataType.EPHEMERAL.getTypeValue());
-            preparedStatement.setString(4, value);
+            try (ResultSet resultSet = preparedStatement.executeQuery()) {
+                if (resultSet.next()) {
+                    return Optional.of(
+                            MysqlRegistryData.builder()
+                                    .id(resultSet.getLong("id"))
+                                    .key(resultSet.getString("key"))
+                                    .data(resultSet.getString("data"))
+                                    .type(resultSet.getInt("type"))
+                                    
.createTime(resultSet.getTimestamp("create_time"))
+                                    
.lastUpdateTime(resultSet.getTimestamp("last_update_time"))
+                                    .build());
+                }
+                return Optional.empty();
+            }
+        }
+    }
+
+    private boolean updateValueById(long id, String value) throws SQLException 
{
+        String sql = "update t_ds_mysql_registry_data set data = ?, last_term 
= ? where id = ?";
+        try (
+                Connection connection = dataSource.getConnection();
+                PreparedStatement preparedStatement = 
connection.prepareStatement(sql)) {
+            preparedStatement.setString(1, value);
+            preparedStatement.setLong(2, System.currentTimeMillis());
+            preparedStatement.setLong(3, id);
+            return preparedStatement.executeUpdate() > 0;
+        }
+    }
+
+    private long insertMysqlRegistryData(@NonNull MysqlRegistryData 
mysqlRegistryData) throws SQLException {
+        String sql =
+                "INSERT INTO t_ds_mysql_registry_data (`key`, data, type, 
last_term) VALUES (?, ?, ?, ?)";
+        try (
+                Connection connection = dataSource.getConnection();
+                PreparedStatement preparedStatement =
+                        connection.prepareStatement(sql, 
Statement.RETURN_GENERATED_KEYS)) {
+            preparedStatement.setString(1, mysqlRegistryData.getKey());
+            preparedStatement.setString(2, mysqlRegistryData.getData());
+            preparedStatement.setInt(3, mysqlRegistryData.getType());
+            preparedStatement.setLong(4, mysqlRegistryData.getLastTerm());
             int insertCount = preparedStatement.executeUpdate();
             ResultSet generatedKeys = preparedStatement.getGeneratedKeys();
             if (insertCount < 1 || !generatedKeys.next()) {
-                throw new SQLException("Insert or update ephemeral data 
error");
+                throw new SQLException("Insert ephemeral data error, data: " + 
mysqlRegistryData);
             }
             return generatedKeys.getLong(1);
         }
     }
 
     public long insertOrUpdatePersistentData(String key, String value) throws 
SQLException {
-        String sql = "INSERT INTO t_ds_mysql_registry_data (`key`, data, type, 
create_time, last_update_time) VALUES (?, ?, ?, current_timestamp, 
current_timestamp)" +
-                "ON DUPLICATE KEY UPDATE data=?, 
last_update_time=current_timestamp";
+        String sql =
+                "INSERT INTO t_ds_mysql_registry_data (`key`, data, type, 
last_term) VALUES (?, ?, ?, ?)"
+                        +
+                        "ON DUPLICATE KEY UPDATE data=?, last_term=?";
         // put a persistent Data
-        try (Connection connection = dataSource.getConnection();
-             PreparedStatement preparedStatement = 
connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) {
+        try (
+                Connection connection = dataSource.getConnection();
+                PreparedStatement preparedStatement =
+                        connection.prepareStatement(sql, 
Statement.RETURN_GENERATED_KEYS)) {
+            long term = System.currentTimeMillis();
             preparedStatement.setString(1, key);
             preparedStatement.setString(2, value);
             preparedStatement.setInt(3, DataType.PERSISTENT.getTypeValue());
-            preparedStatement.setString(4, value);
+            preparedStatement.setLong(4, term);
+            preparedStatement.setString(5, value);
+            preparedStatement.setLong(6, term);
             int insertCount = preparedStatement.executeUpdate();
             ResultSet generatedKeys = preparedStatement.getGeneratedKeys();
             if (insertCount < 1 || !generatedKeys.next()) {
@@ -159,10 +227,10 @@ public class MysqlOperator implements AutoCloseable {
 
     public void clearExpireLock() {
         String sql = "delete from t_ds_mysql_registry_lock where last_term < 
?";
-        try (Connection connection = dataSource.getConnection();
-             PreparedStatement preparedStatement = 
connection.prepareStatement(sql)) {
-            preparedStatement.setTimestamp(1,
-                    new Timestamp(System.currentTimeMillis() - 
expireTimeWindow));
+        try (
+                Connection connection = dataSource.getConnection();
+                PreparedStatement preparedStatement = 
connection.prepareStatement(sql)) {
+            preparedStatement.setLong(1, System.currentTimeMillis() - 
expireTimeWindow);
             int i = preparedStatement.executeUpdate();
             if (i > 0) {
                 logger.info("Clear expire lock, size: {}", i);
@@ -173,10 +241,11 @@ public class MysqlOperator implements AutoCloseable {
     }
 
     public void clearExpireEphemeralDate() {
-        String sql = "delete from t_ds_mysql_registry_data where 
last_update_time < ? and type = ?";
-        try (Connection connection = dataSource.getConnection();
-             PreparedStatement preparedStatement = 
connection.prepareStatement(sql)) {
-            preparedStatement.setTimestamp(1, new 
Timestamp(System.currentTimeMillis() - expireTimeWindow));
+        String sql = "delete from t_ds_mysql_registry_data where last_term < ? 
and type = ?";
+        try (
+                Connection connection = dataSource.getConnection();
+                PreparedStatement preparedStatement = 
connection.prepareStatement(sql)) {
+            preparedStatement.setLong(1, System.currentTimeMillis() - 
expireTimeWindow);
             preparedStatement.setInt(2, DataType.EPHEMERAL.getTypeValue());
             int i = preparedStatement.executeUpdate();
             if (i > 0) {
@@ -188,9 +257,11 @@ public class MysqlOperator implements AutoCloseable {
     }
 
     public MysqlRegistryData getData(String key) throws SQLException {
-        String sql = "SELECT id, `key`, data, type, create_time, 
last_update_time FROM t_ds_mysql_registry_data WHERE `key` = ?";
-        try (Connection connection = dataSource.getConnection();
-             PreparedStatement preparedStatement = 
connection.prepareStatement(sql)) {
+        String sql =
+                "SELECT id, `key`, data, type, last_term, create_time, 
last_update_time FROM t_ds_mysql_registry_data WHERE `key` = ?";
+        try (
+                Connection connection = dataSource.getConnection();
+                PreparedStatement preparedStatement = 
connection.prepareStatement(sql)) {
             preparedStatement.setString(1, key);
             try (ResultSet resultSet = preparedStatement.executeQuery()) {
                 if (!resultSet.next()) {
@@ -201,6 +272,7 @@ public class MysqlOperator implements AutoCloseable {
                         .key(resultSet.getString("key"))
                         .data(resultSet.getString("data"))
                         .type(resultSet.getInt("type"))
+                        .lastTerm(resultSet.getLong("last_term"))
                         .createTime(resultSet.getTimestamp("create_time"))
                         
.lastUpdateTime(resultSet.getTimestamp("last_update_time"))
                         .build();
@@ -244,11 +316,15 @@ public class MysqlOperator implements AutoCloseable {
      * Try to acquire the target Lock, if cannot acquire, return null.
      */
     public MysqlRegistryLock tryToAcquireLock(String key) throws SQLException {
-        String sql = "INSERT INTO t_ds_mysql_registry_lock (`key`, lock_owner, 
last_term, last_update_time, create_time) VALUES (?, ?, current_timestamp, 
current_timestamp, current_timestamp)";
-        try (Connection connection = dataSource.getConnection();
-             PreparedStatement preparedStatement = 
connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) {
+        String sql =
+                "INSERT INTO t_ds_mysql_registry_lock (`key`, lock_owner, 
last_term) VALUES (?, ?, ?)";
+        try (
+                Connection connection = dataSource.getConnection();
+                PreparedStatement preparedStatement =
+                        connection.prepareStatement(sql, 
Statement.RETURN_GENERATED_KEYS)) {
             preparedStatement.setString(1, key);
             preparedStatement.setString(2, MysqlRegistryConstant.LOCK_OWNER);
+            preparedStatement.setLong(3, System.currentTimeMillis());
             preparedStatement.executeUpdate();
             try (ResultSet resultSet = preparedStatement.getGeneratedKeys()) {
                 if (resultSet.next()) {
@@ -274,7 +350,7 @@ public class MysqlOperator implements AutoCloseable {
                             .id(resultSet.getLong("id"))
                             .key(resultSet.getString("key"))
                             .lockOwner(resultSet.getString("lock_owner"))
-                            .lastTerm(resultSet.getTimestamp("last_term"))
+                            .lastTerm(resultSet.getLong("last_term"))
                             
.lastUpdateTime(resultSet.getTimestamp("last_update_time"))
                             .createTime(resultSet.getTimestamp("create_time"))
                             .build();
@@ -296,21 +372,38 @@ public class MysqlOperator implements AutoCloseable {
     }
 
     public boolean updateEphemeralDataTerm(Collection<Long> ephemeralDateIds) 
throws SQLException {
-        String sql = "update t_ds_mysql_registry_data set `last_update_time` = 
current_timestamp() where `id` IN (?)";
-        try (Connection connection = dataSource.getConnection();
-             PreparedStatement preparedStatement = 
connection.prepareStatement(sql)) {
-            Array idArray = connection.createArrayOf("bigint", 
ephemeralDateIds.toArray());
-            preparedStatement.setArray(1, idArray);
+        StringBuilder sb = new StringBuilder("update t_ds_mysql_registry_data 
set `last_term` = ? where `id` IN (");
+        Iterator<Long> iterator = ephemeralDateIds.iterator();
+        for (int i = 0; i < ephemeralDateIds.size(); i++) {
+            sb.append(iterator.next());
+            if (i != ephemeralDateIds.size() - 1) {
+                sb.append(",");
+            }
+        }
+        sb.append(")");
+        try (
+                Connection connection = dataSource.getConnection();
+                PreparedStatement preparedStatement = 
connection.prepareStatement(sb.toString())) {
+            preparedStatement.setLong(1, System.currentTimeMillis());
             return preparedStatement.executeUpdate() > 0;
         }
     }
 
     public boolean updateLockTerm(List<Long> lockIds) throws SQLException {
-        String sql = "update t_ds_mysql_registry_lock set `last_term` = 
current_timestamp and `last_update_time` = current_timestamp where `id` IN (?)";
-        try (Connection connection = dataSource.getConnection();
-             PreparedStatement preparedStatement = 
connection.prepareStatement(sql)) {
-            Array idArray = connection.createArrayOf("bigint", 
lockIds.toArray());
-            preparedStatement.setArray(1, idArray);
+        StringBuilder sb =
+                new StringBuilder("update t_ds_mysql_registry_lock set 
`last_term` = ? where `id` IN (");
+        Iterator<Long> iterator = lockIds.iterator();
+        for (int i = 0; i < lockIds.size(); i++) {
+            sb.append(iterator.next());
+            if (i != lockIds.size() - 1) {
+                sb.append(",");
+            }
+        }
+        sb.append(")");
+        try (
+                Connection connection = dataSource.getConnection();
+                PreparedStatement preparedStatement = 
connection.prepareStatement(sb.toString())) {
+            preparedStatement.setLong(1, System.currentTimeMillis());
             return preparedStatement.executeUpdate() > 0;
         }
     }
diff --git 
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/MysqlRegistryData.java
 
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/MysqlRegistryData.java
index e3045881c5..e9ff6c81de 100644
--- 
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/MysqlRegistryData.java
+++ 
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/MysqlRegistryData.java
@@ -34,6 +34,7 @@ public class MysqlRegistryData {
     private String key;
     private String data;
     private int type;
+    private long lastTerm;
     private Date createTime;
     private Date lastUpdateTime;
 
diff --git 
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/MysqlRegistryLock.java
 
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/MysqlRegistryLock.java
index 79d718cd4f..a86e4afe2c 100644
--- 
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/MysqlRegistryLock.java
+++ 
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/MysqlRegistryLock.java
@@ -42,7 +42,7 @@ public class MysqlRegistryLock {
     /**
      * The last term, if the (currentTime - lastTerm) > termExpire time, the 
lock will be expired.
      */
-    private Date lastTerm;
+    private Long lastTerm;
     /**
      * The lock last update time.
      */
diff --git 
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/resources/mysql_registry_init.sql
 
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/resources/mysql_registry_init.sql
index 6fc936fc6e..77b33c4cf7 100644
--- 
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/resources/mysql_registry_init.sql
+++ 
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/resources/mysql_registry_init.sql
@@ -20,12 +20,13 @@ SET FOREIGN_KEY_CHECKS = 0;
 DROP TABLE IF EXISTS `t_ds_mysql_registry_data`;
 CREATE TABLE `t_ds_mysql_registry_data`
 (
-    `id`               bigint(11)   NOT NULL AUTO_INCREMENT COMMENT 'primary 
key',
-    `key`              varchar(200) NOT NULL COMMENT 'key, like zookeeper node 
path',
-    `data`             varchar(200) NOT NULL COMMENT 'data, like zookeeper 
node value',
-    `type`             tinyint(4)   NOT NULL COMMENT '1: ephemeral node, 2: 
persistent node',
-    `last_update_time` timestamp    NULL COMMENT 'last update time',
-    `create_time`      timestamp    NULL COMMENT 'create time',
+    `id`               bigint(11) NOT NULL AUTO_INCREMENT COMMENT 'primary 
key',
+    `key`              varchar(256) NOT NULL COMMENT 'key, like zookeeper node 
path',
+    `data`             text         NOT NULL COMMENT 'data, like zookeeper 
node value',
+    `type`             tinyint(4) NOT NULL COMMENT '1: ephemeral node, 2: 
persistent node',
+    `last_term`        bigint       NOT NULL COMMENT 'last term time',
+    `last_update_time` timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON 
UPDATE CURRENT_TIMESTAMP COMMENT 'last update time',
+    `create_time`      timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 
'create time',
     PRIMARY KEY (`id`),
     unique (`key`)
 ) ENGINE = InnoDB
@@ -35,12 +36,12 @@ CREATE TABLE `t_ds_mysql_registry_data`
 DROP TABLE IF EXISTS `t_ds_mysql_registry_lock`;
 CREATE TABLE `t_ds_mysql_registry_lock`
 (
-    `id`               bigint(11)   NOT NULL AUTO_INCREMENT COMMENT 'primary 
key',
-    `key`              varchar(200) NOT NULL COMMENT 'lock path',
-    `lock_owner`       varchar(100) NOT NULL COMMENT 'the lock owner, 
ip_processId',
-    `last_term`        timestamp    NOT NULL COMMENT 'last term time',
-    `last_update_time` timestamp    NULL COMMENT 'last update time',
-    `create_time`      timestamp    NULL COMMENT 'lock create time',
+    `id`               bigint(11) NOT NULL AUTO_INCREMENT COMMENT 'primary 
key',
+    `key`              varchar(256) NOT NULL COMMENT 'lock path',
+    `lock_owner`       varchar(256) NOT NULL COMMENT 'the lock owner, 
ip_processId',
+    `last_term`        bigint       NOT NULL COMMENT 'last term time',
+    `last_update_time` timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON 
UPDATE CURRENT_TIMESTAMP COMMENT 'last update time',
+    `create_time`      timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 
'create time',
     PRIMARY KEY (`id`),
     unique (`key`)
 ) ENGINE = InnoDB

Reply via email to