ruanwenjun commented on code in PR #17744:
URL: 
https://github.com/apache/dolphinscheduler/pull/17744#discussion_r2569320787


##########
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistry.java:
##########
@@ -297,13 +298,9 @@ public boolean exists(String key) {
      * get the lock with a lease
      */
     @Override
-    public boolean acquireLock(String key) {
-        Map<String, Long> leaseIdMap = threadLocalLockMap.get();
-        if (null == leaseIdMap) {
-            leaseIdMap = new HashMap<>();
-            threadLocalLockMap.set(leaseIdMap);
-        }
-        if (leaseIdMap.containsKey(key)) {
+    public boolean acquireLock(String lockKey) {
+        Map<String, LockEntry> lockMap = getLockMapFromThreadLocal();
+        if (currentThreadIsReentrant(lockKey, lockMap)) {

Review Comment:
   ```suggestion
           Map<String, LockEntry> threadHeldLocks = getThreadHeldLocks();
           if (acquireBasedOnThreadHeldLocks(lockKey, threadHeldLocks)) {
   ```



##########
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistry.java:
##########
@@ -315,27 +312,41 @@ public boolean acquireLock(String key) {
             // keep the lease
             client.getLeaseClient().keepAlive(leaseId, 
Observers.observer(response -> {
             }));
-            lockClient.lock(byteSequence(key), leaseId).get();
+            lockClient.lock(byteSequence(lockKey), leaseId).get();
 
             // save the leaseId for release Lock
-            leaseIdMap.put(key, leaseId);
+            lockMap.put(lockKey, new LockEntry(leaseId));
             return true;
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             throw new RegistryException("etcd get lock error", e);
         } catch (Exception e) {
-            throw new RegistryException("etcd get lock error, lockKey: " + 
key, e);
+            throw new RegistryException("etcd get lock error, lockKey: " + 
lockKey, e);
+        }
+    }
+
+    private static boolean currentThreadIsReentrant(String lockKey, 
Map<String, LockEntry> lockEntryMap) {
+        LockEntry lockEntry = lockEntryMap.get(lockKey);
+        if (lockEntry != null) {
+            lockEntry.lockCount.incrementAndGet();
+            return true;
         }
+        return false;
+    }

Review Comment:
   The method name is strange.
   ```suggestion
       private static boolean acquireBasedOnThreadHeldLocks(String lockKey, 
Map<String, LockEntry> threadHeldLocks) {
           LockEntry lockEntry = threadHeldLocks.get(lockKey);
           if (lockEntry != null) {
               lockEntry.lockCount.incrementAndGet();
               return true;
   }
           return false;
       }
   ```



##########
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/server/JdbcRegistryLockManager.java:
##########
@@ -85,13 +85,21 @@ public void acquireJdbcRegistryLock(Long clientId, String 
lockKey) {
         }
     }
 
+    private boolean currentThreadIsReentrant(String lockKey, String lockOwner) 
{

Review Comment:
   ```suggestion
       private boolean tryReenterLock(String lockKey, String lockAcquirer) {
   ```



##########
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistry.java:
##########
@@ -315,27 +312,41 @@ public boolean acquireLock(String key) {
             // keep the lease
             client.getLeaseClient().keepAlive(leaseId, 
Observers.observer(response -> {
             }));
-            lockClient.lock(byteSequence(key), leaseId).get();
+            lockClient.lock(byteSequence(lockKey), leaseId).get();
 
             // save the leaseId for release Lock
-            leaseIdMap.put(key, leaseId);
+            lockMap.put(lockKey, new LockEntry(leaseId));
             return true;
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             throw new RegistryException("etcd get lock error", e);
         } catch (Exception e) {
-            throw new RegistryException("etcd get lock error, lockKey: " + 
key, e);
+            throw new RegistryException("etcd get lock error, lockKey: " + 
lockKey, e);
+        }
+    }
+
+    private static boolean currentThreadIsReentrant(String lockKey, 
Map<String, LockEntry> lockEntryMap) {
+        LockEntry lockEntry = lockEntryMap.get(lockKey);
+        if (lockEntry != null) {
+            lockEntry.lockCount.incrementAndGet();
+            return true;
         }
+        return false;
+    }
+
+    private static Map<String, LockEntry> getLockMapFromThreadLocal() {

Review Comment:
   ```suggestion
       private static Map<String, LockEntry> getThreadLocks() {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to