This is an automated email from the ASF dual-hosted git repository.
zihaoxiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 91c1b5748d [Improvement-17715][Registry] The registry center's lock
mechanism supports reentrant acquisition. (#17744)
91c1b5748d is described below
commit 91c1b5748d46271cee69ac4607a484f380776898
Author: Qiong Zhou <[email protected]>
AuthorDate: Mon Dec 1 10:47:32 2025 +0800
[Improvement-17715][Registry] The registry center's lock mechanism supports
reentrant acquisition. (#17744)
---
.../plugin/registry/etcd/EtcdRegistry.java | 79 ++++++++++++++++------
.../plugin/registry/RegistryTestCase.java | 23 +++++++
.../jdbc/server/JdbcRegistryLockManager.java | 31 +++++++--
.../registry/zookeeper/ZookeeperRegistry.java | 12 +---
4 files changed, 107 insertions(+), 38 deletions(-)
diff --git
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistry.java
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistry.java
index 9f1d031131..a5955d3212 100644
---
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistry.java
+++
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistry.java
@@ -37,6 +37,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import javax.net.ssl.SSLException;
@@ -81,7 +82,7 @@ public class EtcdRegistry implements Registry {
public static final String FOLDER_SEPARATOR = "/";
// save the lock info for thread
// key:lockKey Value:leaseId
- private static final ThreadLocal<Map<String, Long>> threadLocalLockMap =
new ThreadLocal<>();
+ private static final ThreadLocal<Map<String, LockEntry>>
threadLocalLockMap = new ThreadLocal<>();
private final Map<String, Watch.Watcher> watcherMap = new
ConcurrentHashMap<>();
@@ -297,13 +298,9 @@ public class EtcdRegistry implements Registry {
* get the lock with a lease
*/
@Override
- public boolean acquireLock(String key) {
- Map<String, Long> leaseIdMap = threadLocalLockMap.get();
- if (null == leaseIdMap) {
- leaseIdMap = new HashMap<>();
- threadLocalLockMap.set(leaseIdMap);
- }
- if (leaseIdMap.containsKey(key)) {
+ public boolean acquireLock(String lockKey) {
+ Map<String, LockEntry> threadHeldLocks = getThreadHeldLocks();
+ if (acquireBasedOnThreadHeldLocks(lockKey, threadHeldLocks)) {
return true;
}
@@ -315,27 +312,41 @@ public class EtcdRegistry implements Registry {
// keep the lease
client.getLeaseClient().keepAlive(leaseId,
Observers.observer(response -> {
}));
- lockClient.lock(byteSequence(key), leaseId).get();
+ lockClient.lock(byteSequence(lockKey), leaseId).get();
// save the leaseId for release Lock
- leaseIdMap.put(key, leaseId);
+ threadHeldLocks.put(lockKey, new LockEntry(leaseId));
return true;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RegistryException("etcd get lock error", e);
} catch (Exception e) {
- throw new RegistryException("etcd get lock error, lockKey: " +
key, e);
+ throw new RegistryException("etcd get lock error, lockKey: " +
lockKey, e);
+ }
+ }
+
+ private static boolean acquireBasedOnThreadHeldLocks(String lockKey,
Map<String, LockEntry> threadHeldLocks) {
+ LockEntry lockEntry = threadHeldLocks.get(lockKey);
+ if (lockEntry != null) {
+ lockEntry.lockCount.incrementAndGet();
+ return true;
}
+ return false;
+ }
+
+ private static Map<String, LockEntry> getThreadHeldLocks() {
+ Map<String, LockEntry> lockEntryMap = threadLocalLockMap.get();
+ if (null == lockEntryMap) {
+ lockEntryMap = new HashMap<>();
+ threadLocalLockMap.set(lockEntryMap);
+ }
+ return lockEntryMap;
}
@Override
public boolean acquireLock(String key, long timeout) {
- Map<String, Long> leaseIdMap = threadLocalLockMap.get();
- if (null == leaseIdMap) {
- leaseIdMap = new HashMap<>();
- threadLocalLockMap.set(leaseIdMap);
- }
- if (leaseIdMap.containsKey(key)) {
+ Map<String, LockEntry> threadHeldLocks = getThreadHeldLocks();
+ if (acquireBasedOnThreadHeldLocks(key, threadHeldLocks)) {
return true;
}
@@ -350,7 +361,7 @@ public class EtcdRegistry implements Registry {
}));
// save the leaseId for release Lock
- leaseIdMap.put(key, leaseId);
+ threadHeldLocks.put(key, new LockEntry(leaseId));
return true;
} catch (TimeoutException timeoutException) {
log.debug("Acquire lock: {} in {}/ms timeout", key, timeout);
@@ -369,10 +380,24 @@ public class EtcdRegistry implements Registry {
@Override
public boolean releaseLock(String key) {
try {
- Long leaseId = threadLocalLockMap.get().get(key);
- client.getLeaseClient().revoke(leaseId);
- threadLocalLockMap.get().remove(key);
- if (threadLocalLockMap.get().isEmpty()) {
+ Map<String, LockEntry> lockEntryMap = threadLocalLockMap.get();
+ if (lockEntryMap == null) {
+ return true;
+ }
+ LockEntry lockEntry = lockEntryMap.get(key);
+ if (lockEntry == null) {
+ return true;
+ }
+ int newLockCount = lockEntry.lockCount.decrementAndGet();
+ if (newLockCount > 0) {
+ return true;
+ }
+ if (newLockCount < 0) {
+ throw new IllegalMonitorStateException("Etcd lock count has
gone negative for lock: " + key);
+ }
+ client.getLeaseClient().revoke(lockEntry.leaseId);
+ lockEntryMap.remove(key);
+ if (lockEntryMap.isEmpty()) {
threadLocalLockMap.remove();
}
} catch (Exception e) {
@@ -418,4 +443,14 @@ public class EtcdRegistry implements Registry {
.build();
}
+ private static class LockEntry {
+
+ final Long leaseId;
+ final AtomicInteger lockCount = new AtomicInteger(1);
+
+ private LockEntry(Long leaseId) {
+ this.leaseId = leaseId;
+ }
+ }
+
}
diff --git
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-it/src/test/java/org/apache/dolphinscheduler/plugin/registry/RegistryTestCase.java
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-it/src/test/java/org/apache/dolphinscheduler/plugin/registry/RegistryTestCase.java
index 5db9798d50..b819ef0eee 100644
---
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-it/src/test/java/org/apache/dolphinscheduler/plugin/registry/RegistryTestCase.java
+++
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-it/src/test/java/org/apache/dolphinscheduler/plugin/registry/RegistryTestCase.java
@@ -258,6 +258,29 @@ public abstract class RegistryTestCase<R extends Registry>
{
assertThat(acquireResult.get()).isTrue();
}
+ @SneakyThrows
+ @Test
+ public void testReentrantLock() {
+ registry.start();
+ String lockKey = "/lock" + System.nanoTime();
+ assertThat(registry.acquireLock(lockKey, 3000)).isTrue();
+ assertThat(registry.acquireLock(lockKey, 3000)).isTrue();
+
+ CompletableFuture<Boolean> acquireResult =
+ CompletableFuture.supplyAsync(() ->
registry.acquireLock(lockKey, 3000));
+ assertThat(acquireResult.get()).isFalse();
+
+ assertThat(registry.releaseLock(lockKey)).isTrue();
+
+ acquireResult = CompletableFuture.supplyAsync(() ->
registry.acquireLock(lockKey, 3000));
+ assertThat(acquireResult.get()).isFalse();
+
+ assertThat(registry.releaseLock(lockKey)).isTrue();
+
+ acquireResult = CompletableFuture.supplyAsync(() ->
registry.acquireLock(lockKey, 3000));
+ assertThat(acquireResult.get()).isTrue();
+ }
+
public abstract R createRegistry();
}
diff --git
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/server/JdbcRegistryLockManager.java
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/server/JdbcRegistryLockManager.java
index 3b1acf1d4d..434f5c49b7 100644
---
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/server/JdbcRegistryLockManager.java
+++
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/server/JdbcRegistryLockManager.java
@@ -24,8 +24,9 @@ import
org.apache.dolphinscheduler.plugin.registry.jdbc.model.DTO.JdbcRegistryLo
import
org.apache.dolphinscheduler.plugin.registry.jdbc.repository.JdbcRegistryLockRepository;
import java.util.Date;
-import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
import lombok.AllArgsConstructor;
import lombok.Builder;
@@ -42,7 +43,7 @@ public class JdbcRegistryLockManager implements
IJdbcRegistryLockManager {
private final JdbcRegistryLockRepository jdbcRegistryLockRepository;
// lockKey -> LockEntry
- private final Map<String, LockEntry> jdbcRegistryLockHolderMap = new
HashMap<>();
+ private final Map<String, LockEntry> jdbcRegistryLockHolderMap = new
ConcurrentHashMap<>();
public JdbcRegistryLockManager(JdbcRegistryProperties
jdbcRegistryProperties,
JdbcRegistryLockRepository
jdbcRegistryLockRepository) {
@@ -54,8 +55,7 @@ public class JdbcRegistryLockManager implements
IJdbcRegistryLockManager {
public void acquireJdbcRegistryLock(Long clientId, String lockKey) {
String lockOwner = LockUtils.getLockOwner();
while (true) {
- LockEntry lockEntry = jdbcRegistryLockHolderMap.get(lockKey);
- if (lockEntry != null &&
lockOwner.equals(lockEntry.getLockOwner())) {
+ if (tryReenterLock(lockKey, lockOwner)) {
return;
}
JdbcRegistryLockDTO jdbcRegistryLock =
JdbcRegistryLockDTO.builder()
@@ -85,13 +85,21 @@ public class JdbcRegistryLockManager implements
IJdbcRegistryLockManager {
}
}
+ private boolean tryReenterLock(String lockKey, String lockAcquirer) {
+ LockEntry lockEntry = jdbcRegistryLockHolderMap.get(lockKey);
+ if (lockEntry != null &&
lockAcquirer.equals(lockEntry.getLockOwner())) {
+ lockEntry.lockCount.incrementAndGet();
+ return true;
+ }
+ return false;
+ }
+
@Override
public boolean acquireJdbcRegistryLock(Long clientId, String lockKey, long
timeout) {
String lockOwner = LockUtils.getLockOwner();
long start = System.currentTimeMillis();
while (System.currentTimeMillis() - start <= timeout) {
- LockEntry lockEntry = jdbcRegistryLockHolderMap.get(lockKey);
- if (lockEntry != null &&
lockOwner.equals(lockEntry.getLockOwner())) {
+ if (tryReenterLock(lockKey, lockOwner)) {
return true;
}
JdbcRegistryLockDTO jdbcRegistryLock =
JdbcRegistryLockDTO.builder()
@@ -124,14 +132,22 @@ public class JdbcRegistryLockManager implements
IJdbcRegistryLockManager {
@Override
public void releaseJdbcRegistryLock(Long clientId, String lockKey) {
+ String lockOwner = LockUtils.getLockOwner();
LockEntry lockEntry = jdbcRegistryLockHolderMap.get(lockKey);
- if (lockEntry == null) {
+ if (lockEntry == null || !lockOwner.equals(lockEntry.getLockOwner())) {
return;
}
if (!clientId.equals(lockEntry.getJdbcRegistryLock().getClientId())) {
throw new UnsupportedOperationException(
"The client " + clientId + " is not the lock owner of the
lock: " + lockKey);
}
+ int newLockCount = lockEntry.lockCount.decrementAndGet();
+ if (newLockCount > 0) {
+ return;
+ }
+ if (newLockCount < 0) {
+ throw new IllegalMonitorStateException("Jdbc lock count has gone
negative for lock: " + lockKey);
+ }
jdbcRegistryLockRepository.deleteById(lockEntry.getJdbcRegistryLock().getId());
jdbcRegistryLockHolderMap.remove(lockKey);
}
@@ -144,6 +160,7 @@ public class JdbcRegistryLockManager implements
IJdbcRegistryLockManager {
private String lockKey;
private String lockOwner;
+ final AtomicInteger lockCount = new AtomicInteger(1);
private JdbcRegistryLockDTO jdbcRegistryLock;
}
}
diff --git
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistry.java
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistry.java
index efa9411e6b..9f2d9a984d 100644
---
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistry.java
+++
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistry.java
@@ -218,12 +218,6 @@ final class ZookeeperRegistry implements Registry {
try {
interProcessMutex =
Optional.ofNullable(processMutexMap.get(key)).orElse(new
InterProcessMutex(client, key));
- if (interProcessMutex.isAcquiredInThisProcess()) {
- // Since etcd/jdbc cannot implement a reentrant lock, we need
to check if the lock is already acquired
- // If it is already acquired, return true directly
- // This means you only need to release once when you acquire
multiple times
- return true;
- }
interProcessMutex.acquire();
processMutexMap.put(key, interProcessMutex);
return true;
@@ -250,9 +244,6 @@ final class ZookeeperRegistry implements Registry {
try {
interProcessMutex =
Optional.ofNullable(processMutexMap.get(key)).orElse(new
InterProcessMutex(client, key));
- if (interProcessMutex.isAcquiredInThisProcess()) {
- return true;
- }
if (interProcessMutex.acquire(timeout, MILLISECONDS)) {
processMutexMap.put(key, interProcessMutex);
return true;
@@ -282,6 +273,9 @@ final class ZookeeperRegistry implements Registry {
}
try {
interProcessMutex.release();
+ if (interProcessMutex.isOwnedByCurrentThread()) {
+ return true;
+ }
processMutexMap.remove(key);
if (processMutexMap.isEmpty()) {
threadLocalLockMap.remove();