This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 1b1ef58b0 [#4129] improvement(core): Support hold multiple tree lock
within a thread at the same time (#4130)
1b1ef58b0 is described below
commit 1b1ef58b0fe9f9f84575319fbd953d8ea3351f61
Author: Qi Yu <[email protected]>
AuthorDate: Fri Jul 12 10:09:06 2024 +0800
[#4129] improvement(core): Support hold multiple tree lock within a thread
at the same time (#4130)
### What changes were proposed in this pull request?
Add the value of the name identifier in the holdingThreadTimestamp to
support holding multiple tree lock at the same time.
### Why are the changes needed?
To support more user sceanrio
Fix: #4129
### Does this PR introduce _any_ user-facing change?
N/A.
### How was this patch tested?
Add new test class `TestTreeLockUtils`
---
.../com/datastrato/gravitino/lock/LockManager.java | 6 +-
.../com/datastrato/gravitino/lock/TreeLock.java | 21 +++++-
.../datastrato/gravitino/lock/TreeLockNode.java | 82 +++++++++++++++-------
.../gravitino/lock/TestTreeLockUtils.java | 51 ++++++++++++++
4 files changed, 131 insertions(+), 29 deletions(-)
diff --git a/core/src/main/java/com/datastrato/gravitino/lock/LockManager.java
b/core/src/main/java/com/datastrato/gravitino/lock/LockManager.java
index b1dbb27fe..9fb0ef6e1 100644
--- a/core/src/main/java/com/datastrato/gravitino/lock/LockManager.java
+++ b/core/src/main/java/com/datastrato/gravitino/lock/LockManager.java
@@ -132,12 +132,12 @@ public class LockManager {
// Check self
node.getHoldingThreadTimestamp()
.forEach(
- (thread, ts) -> {
+ (threadIdentifier, ts) -> {
// If the thread is holding the lock for more than 30 seconds,
we will log it.
if (System.currentTimeMillis() - ts > 30000) {
LOG.warn(
- "Dead lock detected for thread {} on node {}, threads that
holding the node: {} ",
- thread,
+ "Dead lock detected for thread with identifier {} on node
{}, threads that holding the node: {} ",
+ threadIdentifier,
node,
node.getHoldingThreadTimestamp());
}
diff --git a/core/src/main/java/com/datastrato/gravitino/lock/TreeLock.java
b/core/src/main/java/com/datastrato/gravitino/lock/TreeLock.java
index 76d9ab028..02cb0c757 100644
--- a/core/src/main/java/com/datastrato/gravitino/lock/TreeLock.java
+++ b/core/src/main/java/com/datastrato/gravitino/lock/TreeLock.java
@@ -104,8 +104,17 @@ public class TreeLock {
try {
treeLockNode.lock(type);
heldLocks.push(Pair.of(treeLockNode, type));
+
+ treeLockNode.addHoldingThreadTimestamp(
+ Thread.currentThread(), identifier, System.currentTimeMillis());
if (LOG.isTraceEnabled()) {
- LOG.trace("Locked node: {}, lock type: {}", treeLockNode, type);
+ LOG.trace(
+ "Node {} has been lock with '{}' lock, hold by {} with ident
'{}' at {}",
+ this,
+ lockType,
+ Thread.currentThread(),
+ identifier,
+ System.currentTimeMillis());
}
} catch (Exception e) {
LOG.error(
@@ -140,8 +149,16 @@ public class TreeLock {
TreeLockNode current = pair.getLeft();
LockType type = pair.getRight();
current.unlock(type);
+
+ long holdStartTime =
current.removeHoldingThreadTimestamp(Thread.currentThread(), identifier);
if (LOG.isTraceEnabled()) {
- LOG.trace("Unlocked node: {}, lock type: {}", current, type);
+ LOG.trace(
+ "Node {} has been unlock with '{}' lock, hold by {} with ident
'{}' for {} ms",
+ this,
+ lockType,
+ Thread.currentThread(),
+ identifier,
+ System.currentTimeMillis() - holdStartTime);
}
}
diff --git a/core/src/main/java/com/datastrato/gravitino/lock/TreeLockNode.java
b/core/src/main/java/com/datastrato/gravitino/lock/TreeLockNode.java
index a4953c541..92db979aa 100644
--- a/core/src/main/java/com/datastrato/gravitino/lock/TreeLockNode.java
+++ b/core/src/main/java/com/datastrato/gravitino/lock/TreeLockNode.java
@@ -19,6 +19,7 @@
package com.datastrato.gravitino.lock;
+import com.datastrato.gravitino.NameIdentifier;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import com.google.common.collect.Lists;
@@ -44,13 +45,60 @@ public class TreeLockNode {
private final String name;
private final ReentrantReadWriteLock readWriteLock;
@VisibleForTesting final Map<String, TreeLockNode> childMap;
- private final Map<Thread, Long> holdingThreadTimestamp = new
ConcurrentHashMap<>();
+
+ private final Map<ThreadIdentifier, Long> holdingThreadTimestamp = new
ConcurrentHashMap<>();
// The reference count of this node. The reference count is used to track
the number of the
// TreeLocks that are using this node. If the reference count is 0, it means
that no TreeLock is
// using this node, and this node can be removed from the tree.
private final AtomicLong referenceCount = new AtomicLong();
+ /**
+ * The identifier of a thread. This class is used to identify this tree lock
node is held by which
+ * thread and identifier because one thread can hold multiple tree lock
nodes at the same time.
+ * For example, a thread can hold the lock of the root node and the lock of
the child node at the
+ * same time.
+ */
+ static class ThreadIdentifier {
+ private final Thread thread;
+ private final NameIdentifier ident;
+
+ public ThreadIdentifier(Thread thread, NameIdentifier ident) {
+ this.thread = thread;
+ this.ident = ident;
+ }
+
+ static ThreadIdentifier of(Thread thread, NameIdentifier identifier) {
+ return new ThreadIdentifier(thread, identifier);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || !(o instanceof ThreadIdentifier)) {
+ return false;
+ }
+ ThreadIdentifier that = (ThreadIdentifier) o;
+ return Objects.equal(thread, that.thread) && Objects.equal(ident,
that.ident);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(thread, ident);
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder("ThreadIdentifier{");
+ sb.append("thread=").append(thread);
+ sb.append(", ident=").append(ident);
+ sb.append('}');
+ return sb.toString();
+ }
+ }
+
protected TreeLockNode(String name) {
this.name = name;
this.readWriteLock = new ReentrantReadWriteLock();
@@ -61,10 +109,18 @@ public class TreeLockNode {
return name;
}
- Map<Thread, Long> getHoldingThreadTimestamp() {
+ Map<ThreadIdentifier, Long> getHoldingThreadTimestamp() {
return holdingThreadTimestamp;
}
+ void addHoldingThreadTimestamp(Thread currentThread, NameIdentifier
identifier, long timestamp) {
+ holdingThreadTimestamp.put(ThreadIdentifier.of(currentThread, identifier),
timestamp);
+ }
+
+ long removeHoldingThreadTimestamp(Thread currentThread, NameIdentifier
identifier) {
+ return holdingThreadTimestamp.remove(ThreadIdentifier.of(currentThread,
identifier));
+ }
+
/**
* Increase the reference count of this node. The reference count should
always be greater than or
* equal to 0.
@@ -97,17 +153,6 @@ public class TreeLockNode {
} else {
readWriteLock.writeLock().lock();
}
-
- holdingThreadTimestamp.put(Thread.currentThread(),
System.currentTimeMillis());
- if (LOG.isTraceEnabled()) {
- LOG.trace(
- "Node {} has been lock with '{}' lock, hold by {} at {}, current
holding threads: {}",
- this,
- lockType,
- Thread.currentThread(),
- System.currentTimeMillis(),
- holdingThreadTimestamp);
- }
}
/**
@@ -125,17 +170,6 @@ public class TreeLockNode {
}
this.referenceCount.decrementAndGet();
-
- long holdStartTime = holdingThreadTimestamp.remove(Thread.currentThread());
- if (LOG.isTraceEnabled()) {
- LOG.trace(
- "Node {} has been unlock with '{}' lock, hold by {} for {} ms,
current holding threads: {}",
- this,
- lockType,
- Thread.currentThread(),
- System.currentTimeMillis() - holdStartTime,
- holdingThreadTimestamp);
- }
}
/**
diff --git
a/core/src/test/java/com/datastrato/gravitino/lock/TestTreeLockUtils.java
b/core/src/test/java/com/datastrato/gravitino/lock/TestTreeLockUtils.java
new file mode 100644
index 000000000..43fb22443
--- /dev/null
+++ b/core/src/test/java/com/datastrato/gravitino/lock/TestTreeLockUtils.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.datastrato.gravitino.lock;
+
+import static com.datastrato.gravitino.Configs.TREE_LOCK_CLEAN_INTERVAL;
+import static com.datastrato.gravitino.Configs.TREE_LOCK_MAX_NODE_IN_MEMORY;
+import static com.datastrato.gravitino.Configs.TREE_LOCK_MIN_NODE_IN_MEMORY;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
+import com.datastrato.gravitino.Config;
+import com.datastrato.gravitino.GravitinoEnv;
+import com.datastrato.gravitino.NameIdentifier;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.junit.jupiter.api.Test;
+
+public class TestTreeLockUtils {
+
+ @Test
+ void testHolderMultipleLock() throws Exception {
+ Config config = mock(Config.class);
+ doReturn(100000L).when(config).get(TREE_LOCK_MAX_NODE_IN_MEMORY);
+ doReturn(1000L).when(config).get(TREE_LOCK_MIN_NODE_IN_MEMORY);
+ doReturn(36000L).when(config).get(TREE_LOCK_CLEAN_INTERVAL);
+ FieldUtils.writeField(GravitinoEnv.getInstance(), "lockManager", new
LockManager(config), true);
+
+ TreeLockUtils.doWithTreeLock(
+ NameIdentifier.of("test"),
+ LockType.READ,
+ () ->
+ TreeLockUtils.doWithTreeLock(
+ NameIdentifier.of("test", "test1"), LockType.WRITE, () ->
null));
+ }
+}