ruanwenjun commented on code in PR #17744:
URL:
https://github.com/apache/dolphinscheduler/pull/17744#discussion_r2569320787
##########
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistry.java:
##########
@@ -297,13 +298,9 @@ public boolean exists(String key) {
* get the lock with a lease
*/
@Override
- public boolean acquireLock(String key) {
- Map<String, Long> leaseIdMap = threadLocalLockMap.get();
- if (null == leaseIdMap) {
- leaseIdMap = new HashMap<>();
- threadLocalLockMap.set(leaseIdMap);
- }
- if (leaseIdMap.containsKey(key)) {
+ public boolean acquireLock(String lockKey) {
+ Map<String, LockEntry> lockMap = getLockMapFromThreadLocal();
+ if (currentThreadIsReentrant(lockKey, lockMap)) {
Review Comment:
```suggestion
Map<String, LockEntry> threadHeldLocks = getThreadHeldLocks();
if (acquireBasedOnThreadHeldLocks(lockKey, threadHeldLocks)) {
```
##########
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistry.java:
##########
@@ -315,27 +312,41 @@ public boolean acquireLock(String key) {
// keep the lease
client.getLeaseClient().keepAlive(leaseId,
Observers.observer(response -> {
}));
- lockClient.lock(byteSequence(key), leaseId).get();
+ lockClient.lock(byteSequence(lockKey), leaseId).get();
// save the leaseId for release Lock
- leaseIdMap.put(key, leaseId);
+ lockMap.put(lockKey, new LockEntry(leaseId));
return true;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RegistryException("etcd get lock error", e);
} catch (Exception e) {
- throw new RegistryException("etcd get lock error, lockKey: " +
key, e);
+ throw new RegistryException("etcd get lock error, lockKey: " +
lockKey, e);
+ }
+ }
+
+ private static boolean currentThreadIsReentrant(String lockKey,
Map<String, LockEntry> lockEntryMap) {
+ LockEntry lockEntry = lockEntryMap.get(lockKey);
+ if (lockEntry != null) {
+ lockEntry.lockCount.incrementAndGet();
+ return true;
}
+ return false;
+ }
Review Comment:
The method name is strange.
```suggestion
private static boolean acquireBasedOnThreadHeldLocks(String lockKey,
Map<String, LockEntry> threadHeldLocks) {
LockEntry lockEntry = threadHeldLocks.get(lockKey);
if (lockEntry != null) {
lockEntry.lockCount.incrementAndGet();
return true;
}
return false;
}
```
##########
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/server/JdbcRegistryLockManager.java:
##########
@@ -85,13 +85,21 @@ public void acquireJdbcRegistryLock(Long clientId, String
lockKey) {
}
}
+ private boolean currentThreadIsReentrant(String lockKey, String lockOwner)
{
Review Comment:
```suggestion
private boolean tryReenterLock(String lockKey, String lockAcquirer) {
```
##########
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistry.java:
##########
@@ -315,27 +312,41 @@ public boolean acquireLock(String key) {
// keep the lease
client.getLeaseClient().keepAlive(leaseId,
Observers.observer(response -> {
}));
- lockClient.lock(byteSequence(key), leaseId).get();
+ lockClient.lock(byteSequence(lockKey), leaseId).get();
// save the leaseId for release Lock
- leaseIdMap.put(key, leaseId);
+ lockMap.put(lockKey, new LockEntry(leaseId));
return true;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RegistryException("etcd get lock error", e);
} catch (Exception e) {
- throw new RegistryException("etcd get lock error, lockKey: " +
key, e);
+ throw new RegistryException("etcd get lock error, lockKey: " +
lockKey, e);
+ }
+ }
+
+ private static boolean currentThreadIsReentrant(String lockKey,
Map<String, LockEntry> lockEntryMap) {
+ LockEntry lockEntry = lockEntryMap.get(lockKey);
+ if (lockEntry != null) {
+ lockEntry.lockCount.incrementAndGet();
+ return true;
}
+ return false;
+ }
+
+ private static Map<String, LockEntry> getLockMapFromThreadLocal() {
Review Comment:
```suggestion
private static Map<String, LockEntry> getThreadLocks() {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]