This is an automated email from the ASF dual-hosted git repository.

zihaoxiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 91c1b5748d [Improvement-17715][Registry] The registry center's lock 
mechanism supports reentrant acquisition. (#17744)
91c1b5748d is described below

commit 91c1b5748d46271cee69ac4607a484f380776898
Author: Qiong Zhou <[email protected]>
AuthorDate: Mon Dec 1 10:47:32 2025 +0800

    [Improvement-17715][Registry] The registry center's lock mechanism supports 
reentrant acquisition. (#17744)
---
 .../plugin/registry/etcd/EtcdRegistry.java         | 79 ++++++++++++++++------
 .../plugin/registry/RegistryTestCase.java          | 23 +++++++
 .../jdbc/server/JdbcRegistryLockManager.java       | 31 +++++++--
 .../registry/zookeeper/ZookeeperRegistry.java      | 12 +---
 4 files changed, 107 insertions(+), 38 deletions(-)

diff --git 
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistry.java
 
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistry.java
index 9f1d031131..a5955d3212 100644
--- 
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistry.java
+++ 
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistry.java
@@ -37,6 +37,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
 import javax.net.ssl.SSLException;
@@ -81,7 +82,7 @@ public class EtcdRegistry implements Registry {
     public static final String FOLDER_SEPARATOR = "/";
     // save the lock info for thread
     // key:lockKey Value:leaseId
-    private static final ThreadLocal<Map<String, Long>> threadLocalLockMap = 
new ThreadLocal<>();
+    private static final ThreadLocal<Map<String, LockEntry>> 
threadLocalLockMap = new ThreadLocal<>();
 
     private final Map<String, Watch.Watcher> watcherMap = new 
ConcurrentHashMap<>();
 
@@ -297,13 +298,9 @@ public class EtcdRegistry implements Registry {
      * get the lock with a lease
      */
     @Override
-    public boolean acquireLock(String key) {
-        Map<String, Long> leaseIdMap = threadLocalLockMap.get();
-        if (null == leaseIdMap) {
-            leaseIdMap = new HashMap<>();
-            threadLocalLockMap.set(leaseIdMap);
-        }
-        if (leaseIdMap.containsKey(key)) {
+    public boolean acquireLock(String lockKey) {
+        Map<String, LockEntry> threadHeldLocks = getThreadHeldLocks();
+        if (acquireBasedOnThreadHeldLocks(lockKey, threadHeldLocks)) {
             return true;
         }
 
@@ -315,27 +312,41 @@ public class EtcdRegistry implements Registry {
             // keep the lease
             client.getLeaseClient().keepAlive(leaseId, 
Observers.observer(response -> {
             }));
-            lockClient.lock(byteSequence(key), leaseId).get();
+            lockClient.lock(byteSequence(lockKey), leaseId).get();
 
             // save the leaseId for release Lock
-            leaseIdMap.put(key, leaseId);
+            threadHeldLocks.put(lockKey, new LockEntry(leaseId));
             return true;
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             throw new RegistryException("etcd get lock error", e);
         } catch (Exception e) {
-            throw new RegistryException("etcd get lock error, lockKey: " + 
key, e);
+            throw new RegistryException("etcd get lock error, lockKey: " + 
lockKey, e);
+        }
+    }
+
+    private static boolean acquireBasedOnThreadHeldLocks(String lockKey, 
Map<String, LockEntry> threadHeldLocks) {
+        LockEntry lockEntry = threadHeldLocks.get(lockKey);
+        if (lockEntry != null) {
+            lockEntry.lockCount.incrementAndGet();
+            return true;
         }
+        return false;
+    }
+
+    private static Map<String, LockEntry> getThreadHeldLocks() {
+        Map<String, LockEntry> lockEntryMap = threadLocalLockMap.get();
+        if (null == lockEntryMap) {
+            lockEntryMap = new HashMap<>();
+            threadLocalLockMap.set(lockEntryMap);
+        }
+        return lockEntryMap;
     }
 
     @Override
     public boolean acquireLock(String key, long timeout) {
-        Map<String, Long> leaseIdMap = threadLocalLockMap.get();
-        if (null == leaseIdMap) {
-            leaseIdMap = new HashMap<>();
-            threadLocalLockMap.set(leaseIdMap);
-        }
-        if (leaseIdMap.containsKey(key)) {
+        Map<String, LockEntry> threadHeldLocks = getThreadHeldLocks();
+        if (acquireBasedOnThreadHeldLocks(key, threadHeldLocks)) {
             return true;
         }
 
@@ -350,7 +361,7 @@ public class EtcdRegistry implements Registry {
             }));
 
             // save the leaseId for release Lock
-            leaseIdMap.put(key, leaseId);
+            threadHeldLocks.put(key, new LockEntry(leaseId));
             return true;
         } catch (TimeoutException timeoutException) {
             log.debug("Acquire lock: {} in {}/ms timeout", key, timeout);
@@ -369,10 +380,24 @@ public class EtcdRegistry implements Registry {
     @Override
     public boolean releaseLock(String key) {
         try {
-            Long leaseId = threadLocalLockMap.get().get(key);
-            client.getLeaseClient().revoke(leaseId);
-            threadLocalLockMap.get().remove(key);
-            if (threadLocalLockMap.get().isEmpty()) {
+            Map<String, LockEntry> lockEntryMap = threadLocalLockMap.get();
+            if (lockEntryMap == null) {
+                return true;
+            }
+            LockEntry lockEntry = lockEntryMap.get(key);
+            if (lockEntry == null) {
+                return true;
+            }
+            int newLockCount = lockEntry.lockCount.decrementAndGet();
+            if (newLockCount > 0) {
+                return true;
+            }
+            if (newLockCount < 0) {
+                throw new IllegalMonitorStateException("Etcd lock count has 
gone negative for lock: " + key);
+            }
+            client.getLeaseClient().revoke(lockEntry.leaseId);
+            lockEntryMap.remove(key);
+            if (lockEntryMap.isEmpty()) {
                 threadLocalLockMap.remove();
             }
         } catch (Exception e) {
@@ -418,4 +443,14 @@ public class EtcdRegistry implements Registry {
                 .build();
     }
 
+    private static class LockEntry {
+
+        final Long leaseId;
+        final AtomicInteger lockCount = new AtomicInteger(1);
+
+        private LockEntry(Long leaseId) {
+            this.leaseId = leaseId;
+        }
+    }
+
 }
diff --git 
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-it/src/test/java/org/apache/dolphinscheduler/plugin/registry/RegistryTestCase.java
 
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-it/src/test/java/org/apache/dolphinscheduler/plugin/registry/RegistryTestCase.java
index 5db9798d50..b819ef0eee 100644
--- 
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-it/src/test/java/org/apache/dolphinscheduler/plugin/registry/RegistryTestCase.java
+++ 
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-it/src/test/java/org/apache/dolphinscheduler/plugin/registry/RegistryTestCase.java
@@ -258,6 +258,29 @@ public abstract class RegistryTestCase<R extends Registry> 
{
         assertThat(acquireResult.get()).isTrue();
     }
 
+    @SneakyThrows
+    @Test
+    public void testReentrantLock() {
+        registry.start();
+        String lockKey = "/lock" + System.nanoTime();
+        assertThat(registry.acquireLock(lockKey, 3000)).isTrue();
+        assertThat(registry.acquireLock(lockKey, 3000)).isTrue();
+
+        CompletableFuture<Boolean> acquireResult =
+                CompletableFuture.supplyAsync(() -> 
registry.acquireLock(lockKey, 3000));
+        assertThat(acquireResult.get()).isFalse();
+
+        assertThat(registry.releaseLock(lockKey)).isTrue();
+
+        acquireResult = CompletableFuture.supplyAsync(() -> 
registry.acquireLock(lockKey, 3000));
+        assertThat(acquireResult.get()).isFalse();
+
+        assertThat(registry.releaseLock(lockKey)).isTrue();
+
+        acquireResult = CompletableFuture.supplyAsync(() -> 
registry.acquireLock(lockKey, 3000));
+        assertThat(acquireResult.get()).isTrue();
+    }
+
     public abstract R createRegistry();
 
 }
diff --git 
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/server/JdbcRegistryLockManager.java
 
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/server/JdbcRegistryLockManager.java
index 3b1acf1d4d..434f5c49b7 100644
--- 
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/server/JdbcRegistryLockManager.java
+++ 
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/server/JdbcRegistryLockManager.java
@@ -24,8 +24,9 @@ import 
org.apache.dolphinscheduler.plugin.registry.jdbc.model.DTO.JdbcRegistryLo
 import 
org.apache.dolphinscheduler.plugin.registry.jdbc.repository.JdbcRegistryLockRepository;
 
 import java.util.Date;
-import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import lombok.AllArgsConstructor;
 import lombok.Builder;
@@ -42,7 +43,7 @@ public class JdbcRegistryLockManager implements 
IJdbcRegistryLockManager {
     private final JdbcRegistryLockRepository jdbcRegistryLockRepository;
 
     // lockKey -> LockEntry
-    private final Map<String, LockEntry> jdbcRegistryLockHolderMap = new 
HashMap<>();
+    private final Map<String, LockEntry> jdbcRegistryLockHolderMap = new 
ConcurrentHashMap<>();
 
     public JdbcRegistryLockManager(JdbcRegistryProperties 
jdbcRegistryProperties,
                                    JdbcRegistryLockRepository 
jdbcRegistryLockRepository) {
@@ -54,8 +55,7 @@ public class JdbcRegistryLockManager implements 
IJdbcRegistryLockManager {
     public void acquireJdbcRegistryLock(Long clientId, String lockKey) {
         String lockOwner = LockUtils.getLockOwner();
         while (true) {
-            LockEntry lockEntry = jdbcRegistryLockHolderMap.get(lockKey);
-            if (lockEntry != null && 
lockOwner.equals(lockEntry.getLockOwner())) {
+            if (tryReenterLock(lockKey, lockOwner)) {
                 return;
             }
             JdbcRegistryLockDTO jdbcRegistryLock = 
JdbcRegistryLockDTO.builder()
@@ -85,13 +85,21 @@ public class JdbcRegistryLockManager implements 
IJdbcRegistryLockManager {
         }
     }
 
+    private boolean tryReenterLock(String lockKey, String lockAcquirer) {
+        LockEntry lockEntry = jdbcRegistryLockHolderMap.get(lockKey);
+        if (lockEntry != null && 
lockAcquirer.equals(lockEntry.getLockOwner())) {
+            lockEntry.lockCount.incrementAndGet();
+            return true;
+        }
+        return false;
+    }
+
     @Override
     public boolean acquireJdbcRegistryLock(Long clientId, String lockKey, long 
timeout) {
         String lockOwner = LockUtils.getLockOwner();
         long start = System.currentTimeMillis();
         while (System.currentTimeMillis() - start <= timeout) {
-            LockEntry lockEntry = jdbcRegistryLockHolderMap.get(lockKey);
-            if (lockEntry != null && 
lockOwner.equals(lockEntry.getLockOwner())) {
+            if (tryReenterLock(lockKey, lockOwner)) {
                 return true;
             }
             JdbcRegistryLockDTO jdbcRegistryLock = 
JdbcRegistryLockDTO.builder()
@@ -124,14 +132,22 @@ public class JdbcRegistryLockManager implements 
IJdbcRegistryLockManager {
 
     @Override
     public void releaseJdbcRegistryLock(Long clientId, String lockKey) {
+        String lockOwner = LockUtils.getLockOwner();
         LockEntry lockEntry = jdbcRegistryLockHolderMap.get(lockKey);
-        if (lockEntry == null) {
+        if (lockEntry == null || !lockOwner.equals(lockEntry.getLockOwner())) {
             return;
         }
         if (!clientId.equals(lockEntry.getJdbcRegistryLock().getClientId())) {
             throw new UnsupportedOperationException(
                     "The client " + clientId + " is not the lock owner of the 
lock: " + lockKey);
         }
+        int newLockCount = lockEntry.lockCount.decrementAndGet();
+        if (newLockCount > 0) {
+            return;
+        }
+        if (newLockCount < 0) {
+            throw new IllegalMonitorStateException("Jdbc lock count has gone 
negative for lock: " + lockKey);
+        }
         
jdbcRegistryLockRepository.deleteById(lockEntry.getJdbcRegistryLock().getId());
         jdbcRegistryLockHolderMap.remove(lockKey);
     }
@@ -144,6 +160,7 @@ public class JdbcRegistryLockManager implements 
IJdbcRegistryLockManager {
 
         private String lockKey;
         private String lockOwner;
+        final AtomicInteger lockCount = new AtomicInteger(1);
         private JdbcRegistryLockDTO jdbcRegistryLock;
     }
 }
diff --git 
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistry.java
 
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistry.java
index efa9411e6b..9f2d9a984d 100644
--- 
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistry.java
+++ 
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistry.java
@@ -218,12 +218,6 @@ final class ZookeeperRegistry implements Registry {
         try {
             interProcessMutex =
                     Optional.ofNullable(processMutexMap.get(key)).orElse(new 
InterProcessMutex(client, key));
-            if (interProcessMutex.isAcquiredInThisProcess()) {
-                // Since etcd/jdbc cannot implement a reentrant lock, we need 
to check if the lock is already acquired
-                // If it is already acquired, return true directly
-                // This means you only need to release once when you acquire 
multiple times
-                return true;
-            }
             interProcessMutex.acquire();
             processMutexMap.put(key, interProcessMutex);
             return true;
@@ -250,9 +244,6 @@ final class ZookeeperRegistry implements Registry {
         try {
             interProcessMutex =
                     Optional.ofNullable(processMutexMap.get(key)).orElse(new 
InterProcessMutex(client, key));
-            if (interProcessMutex.isAcquiredInThisProcess()) {
-                return true;
-            }
             if (interProcessMutex.acquire(timeout, MILLISECONDS)) {
                 processMutexMap.put(key, interProcessMutex);
                 return true;
@@ -282,6 +273,9 @@ final class ZookeeperRegistry implements Registry {
         }
         try {
             interProcessMutex.release();
+            if (interProcessMutex.isOwnedByCurrentThread()) {
+                return true;
+            }
             processMutexMap.remove(key);
             if (processMutexMap.isEmpty()) {
                 threadLocalLockMap.remove();

Reply via email to