Copilot commented on code in PR #17744:
URL:
https://github.com/apache/dolphinscheduler/pull/17744#discussion_r2566933997
##########
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/server/JdbcRegistryLockManager.java:
##########
@@ -92,6 +94,7 @@ public boolean acquireJdbcRegistryLock(Long clientId, String
lockKey, long timeo
while (System.currentTimeMillis() - start <= timeout) {
LockEntry lockEntry = jdbcRegistryLockHolderMap.get(lockKey);
if (lockEntry != null &&
lockOwner.equals(lockEntry.getLockOwner())) {
+ lockEntry.lockCount.incrementAndGet();
Review Comment:
There is a potential race condition when checking for reentrancy. Between
line 95 (checking if `lockEntry` exists) and line 97 (incrementing the
counter), another thread could complete a release and remove the entry from the
map. This could lead to incrementing a counter on a lock entry that is no
longer in the map. Consider using `ConcurrentHashMap.compute()` or
synchronization to ensure atomicity of the check-and-increment operation.
```suggestion
boolean reentered = jdbcRegistryLockHolderMap.compute(lockKey,
(key, entry) -> {
if (entry != null && lockOwner.equals(entry.getLockOwner()))
{
entry.lockCount.incrementAndGet();
return entry;
}
return entry;
}) != null &&
lockOwner.equals(jdbcRegistryLockHolderMap.get(lockKey).getLockOwner());
if (reentered) {
```
##########
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-it/src/test/java/org/apache/dolphinscheduler/plugin/registry/RegistryTestCase.java:
##########
@@ -258,6 +258,40 @@ public void testReleaseLock() {
assertThat(acquireResult.get()).isTrue();
}
+ @SneakyThrows
+ @Test
+ public void testReentrantLock() {
+ registry.start();
+ String lockKey = "/lock" + System.nanoTime();
+ // 1. Acquire the lock in the main thread
+ assertThat(registry.acquireLock(lockKey, 3000)).isTrue();
+ // Acquire the lock in the main thread
+ // It should acquire success
Review Comment:
The comment is misleading. It says "Acquire the lock in the main thread" but
the actual behavior being tested is that the lock is acquired reentrantly (a
second time) in the same thread that already holds it.
```suggestion
// 2. Acquire the lock reentrantly (a second time) in the same
thread that already holds it
// It should acquire successfully due to reentrant locking
```
##########
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-it/src/test/java/org/apache/dolphinscheduler/plugin/registry/RegistryTestCase.java:
##########
@@ -258,6 +258,40 @@ public void testReleaseLock() {
assertThat(acquireResult.get()).isTrue();
}
+ @SneakyThrows
+ @Test
+ public void testReentrantLock() {
+ registry.start();
+ String lockKey = "/lock" + System.nanoTime();
+ // 1. Acquire the lock in the main thread
+ assertThat(registry.acquireLock(lockKey, 3000)).isTrue();
+ // Acquire the lock in the main thread
+ // It should acquire success
+ assertThat(registry.acquireLock(lockKey, 3000)).isTrue();
+
+ // 2. Acquire the lock at another thread
+ // It should acquire failed
+ CompletableFuture<Boolean> acquireResult =
+ CompletableFuture.supplyAsync(() ->
registry.acquireLock(lockKey, 3000));
+ assertThat(acquireResult.get()).isFalse();
+
+ // 3. Release the lock in the main thread
+ assertThat(registry.releaseLock(lockKey)).isTrue();
+
+ // Acquire the lock at another thread
+ // It should acquire failed
+ acquireResult = CompletableFuture.supplyAsync(() ->
registry.acquireLock(lockKey, 3000));
+ assertThat(acquireResult.get()).isFalse();
+
+ // 4. Release the lock in the main thread again
+ assertThat(registry.releaseLock(lockKey)).isTrue();
+
+ // Acquire the lock at another thread
+ // It should acquire success
+ acquireResult = CompletableFuture.supplyAsync(() ->
registry.acquireLock(lockKey, 3000));
+ assertThat(acquireResult.get()).isTrue();
Review Comment:
The comment is misleading. It says "Acquire the lock at another thread" but
the test is checking that the lock acquisition **succeeds** because the main
thread has fully released it. Consider clarifying this with: "// Try to acquire
the lock from another thread; it should now succeed because the main thread has
fully released it"
##########
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/server/JdbcRegistryLockManager.java:
##########
@@ -124,14 +127,22 @@ public boolean acquireJdbcRegistryLock(Long clientId,
String lockKey, long timeo
@Override
public void releaseJdbcRegistryLock(Long clientId, String lockKey) {
+ String lockOwner = LockUtils.getLockOwner();
LockEntry lockEntry = jdbcRegistryLockHolderMap.get(lockKey);
- if (lockEntry == null) {
+ if (lockEntry == null || !lockOwner.equals(lockEntry.getLockOwner())) {
return;
}
if (!clientId.equals(lockEntry.getJdbcRegistryLock().getClientId())) {
throw new UnsupportedOperationException(
"The client " + clientId + " is not the lock owner of the
lock: " + lockKey);
}
+ int newLockCount = lockEntry.lockCount.decrementAndGet();
+ if (newLockCount > 0) {
+ return;
+ }
+ if (newLockCount < 0) {
+ throw new IllegalMonitorStateException("Lock count has gone
negative for lock: " + lockKey);
+ }
jdbcRegistryLockRepository.deleteById(lockEntry.getJdbcRegistryLock().getId());
jdbcRegistryLockHolderMap.remove(lockKey);
Review Comment:
There is a potential race condition in the release logic. Between checking
the lock count at line 139-142 and removing the entry from the map at line 147,
another thread could be attempting to acquire the lock. This could lead to a
situation where the lock is removed from the map while another thread is
checking for reentrancy, causing inconsistent state. Consider using
synchronization or `ConcurrentHashMap.compute()` to ensure atomicity of the
entire release operation.
##########
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistry.java:
##########
@@ -218,12 +218,6 @@ public boolean acquireLock(String key) {
try {
interProcessMutex =
Optional.ofNullable(processMutexMap.get(key)).orElse(new
InterProcessMutex(client, key));
- if (interProcessMutex.isAcquiredInThisProcess()) {
- // Since etcd/jdbc cannot implement a reentrant lock, we need
to check if the lock is already acquired
- // If it is already acquired, return true directly
- // This means you only need to release once when you acquire
multiple times
- return true;
- }
interProcessMutex.acquire();
processMutexMap.put(key, interProcessMutex);
return true;
Review Comment:
The removal of the reentrant lock check before acquiring the lock is
problematic. Curator's `InterProcessMutex` already supports reentrancy
natively, and by removing the early return check, you're now calling
`acquire()` multiple times on the same lock within the same thread. This
changes the semantics: now you need to call `release()` the same number of
times as `acquire()` was called. However, unlike JDBC and Etcd implementations
which track the count explicitly, this relies on Curator's internal reentrancy
counter. This could lead to inconsistent behavior across registry
implementations if the release logic doesn't properly handle all reentrant
acquisitions.
##########
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-it/src/test/java/org/apache/dolphinscheduler/plugin/registry/RegistryTestCase.java:
##########
@@ -258,6 +258,40 @@ public void testReleaseLock() {
assertThat(acquireResult.get()).isTrue();
}
+ @SneakyThrows
+ @Test
+ public void testReentrantLock() {
+ registry.start();
+ String lockKey = "/lock" + System.nanoTime();
+ // 1. Acquire the lock in the main thread
+ assertThat(registry.acquireLock(lockKey, 3000)).isTrue();
+ // Acquire the lock in the main thread
+ // It should acquire success
+ assertThat(registry.acquireLock(lockKey, 3000)).isTrue();
+
+ // 2. Acquire the lock at another thread
+ // It should acquire failed
+ CompletableFuture<Boolean> acquireResult =
+ CompletableFuture.supplyAsync(() ->
registry.acquireLock(lockKey, 3000));
+ assertThat(acquireResult.get()).isFalse();
+
+ // 3. Release the lock in the main thread
+ assertThat(registry.releaseLock(lockKey)).isTrue();
+
+ // Acquire the lock at another thread
+ // It should acquire failed
Review Comment:
The comment is misleading. It says "Acquire the lock at another thread" but
the test is checking that the lock acquisition **fails** because the main
thread still holds it. Consider clarifying this with: "// Try to acquire the
lock from another thread; it should fail because the main thread still holds it"
```suggestion
// Try to acquire the lock from another thread; it should fail
because the main thread still holds it
```
##########
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistry.java:
##########
@@ -369,8 +374,18 @@ public boolean acquireLock(String key, long timeout) {
@Override
public boolean releaseLock(String key) {
try {
- Long leaseId = threadLocalLockMap.get().get(key);
- client.getLeaseClient().revoke(leaseId);
+ LockEntry lockEntry = threadLocalLockMap.get().get(key);
Review Comment:
There is a potential NPE risk when calling
`threadLocalLockMap.get().get(key)`. If the ThreadLocal has been cleared by
another operation or thread (e.g., at line 391), this will throw a
NullPointerException. Consider checking if `threadLocalLockMap.get()` is null
before calling `.get(key)` on it, similar to the pattern used in
`acquireLock()` methods.
```suggestion
Map<String, LockEntry> lockMap = threadLocalLockMap.get();
if (lockMap == null) {
return true;
}
LockEntry lockEntry = lockMap.get(key);
```
##########
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistry.java:
##########
@@ -369,8 +374,18 @@ public boolean acquireLock(String key, long timeout) {
@Override
public boolean releaseLock(String key) {
try {
- Long leaseId = threadLocalLockMap.get().get(key);
- client.getLeaseClient().revoke(leaseId);
+ LockEntry lockEntry = threadLocalLockMap.get().get(key);
+ if (lockEntry == null) {
+ return true;
+ }
+ int newLockCount = lockEntry.lockCount.decrementAndGet();
+ if (newLockCount > 0) {
+ return true;
+ }
+ if (newLockCount < 0) {
+ throw new IllegalMonitorStateException("Lock count has gone
negative for lock: " + key);
+ }
+ client.getLeaseClient().revoke(lockEntry.leaseId);
threadLocalLockMap.get().remove(key);
if (threadLocalLockMap.get().isEmpty()) {
threadLocalLockMap.remove();
Review Comment:
There is a potential NPE risk when calling
`threadLocalLockMap.get().remove(key)` and
`threadLocalLockMap.get().isEmpty()`. If the ThreadLocal has been cleared, this
will throw a NullPointerException. Consider storing the result of
`threadLocalLockMap.get()` in a local variable and checking for null before
performing operations on it.
##########
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/server/JdbcRegistryLockManager.java:
##########
@@ -56,6 +57,7 @@ public void acquireJdbcRegistryLock(Long clientId, String
lockKey) {
while (true) {
LockEntry lockEntry = jdbcRegistryLockHolderMap.get(lockKey);
if (lockEntry != null &&
lockOwner.equals(lockEntry.getLockOwner())) {
+ lockEntry.lockCount.incrementAndGet();
Review Comment:
There is a potential race condition when checking for reentrancy. Between
line 58 (checking if `lockEntry` exists) and line 60 (incrementing the
counter), another thread could complete a release and remove the entry from the
map. This could lead to incrementing a counter on a lock entry that is no
longer in the map. Consider using `ConcurrentHashMap.compute()` or
synchronization to ensure atomicity of the check-and-increment operation.
```suggestion
boolean reentered = jdbcRegistryLockHolderMap.compute(lockKey,
(key, entry) -> {
if (entry != null && lockOwner.equals(entry.getLockOwner()))
{
entry.lockCount.incrementAndGet();
return entry;
}
return entry;
}) != null &&
lockOwner.equals(jdbcRegistryLockHolderMap.get(lockKey).getLockOwner());
if (reentered) {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]