This is an automated email from the ASF dual-hosted git repository.
apurtell pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new cfba2f2b14 HBASE-27064 CME in TestRegionNormalizerWorkQueue (#4468)
cfba2f2b14 is described below
commit cfba2f2b1479510a404b6f01418c91e04fb4fe9a
Author: Andrew Purtell <[email protected]>
AuthorDate: Tue May 31 11:33:06 2022 -0700
HBASE-27064 CME in TestRegionNormalizerWorkQueue (#4468)
Signed-off-by: Viraj Jasani <[email protected]>
---
.../normalizer/RegionNormalizerWorkQueue.java | 107 ++++++---------------
1 file changed, 32 insertions(+), 75 deletions(-)
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerWorkQueue.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerWorkQueue.java
index c1cab5f97b..f8c969a9f6 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerWorkQueue.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerWorkQueue.java
@@ -25,7 +25,7 @@ import java.util.Queue;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.yetus.audience.InterfaceAudience;
/**
@@ -39,8 +39,6 @@ import org.apache.yetus.audience.InterfaceAudience;
* {@link BlockingQueue}.</li>
* <li>Allows a producer to insert an item at the head of the queue, if
desired.</li>
* </ul>
- * Assumes low-frequency and low-parallelism concurrent access, so protects
state using a simplistic
- * synchronization strategy.
*/
@InterfaceAudience.Private
class RegionNormalizerWorkQueue<E> {
@@ -48,53 +46,15 @@ class RegionNormalizerWorkQueue<E> {
/** Underlying storage structure that gives us the Set behavior and FIFO
retrieval policy. */
private LinkedHashSet<E> delegate;
- // the locking structure used here follows the example found in
LinkedBlockingQueue. The
- // difference is that our locks guard access to `delegate` rather than the
head node.
-
- /** Lock held by take, poll, etc */
- private final ReentrantLock takeLock;
-
+ /** Lock for puts and takes **/
+ private final ReentrantReadWriteLock lock;
/** Wait queue for waiting takes */
private final Condition notEmpty;
- /** Lock held by put, offer, etc */
- private final ReentrantLock putLock;
-
RegionNormalizerWorkQueue() {
delegate = new LinkedHashSet<>();
- takeLock = new ReentrantLock();
- notEmpty = takeLock.newCondition();
- putLock = new ReentrantLock();
- }
-
- /**
- * Signals a waiting take. Called only from put/offer (which do not
otherwise ordinarily lock
- * takeLock.)
- */
- private void signalNotEmpty() {
- final ReentrantLock takeLock = this.takeLock;
- takeLock.lock();
- try {
- notEmpty.signal();
- } finally {
- takeLock.unlock();
- }
- }
-
- /**
- * Locks to prevent both puts and takes.
- */
- private void fullyLock() {
- putLock.lock();
- takeLock.lock();
- }
-
- /**
- * Unlocks to allow both puts and takes.
- */
- private void fullyUnlock() {
- takeLock.unlock();
- putLock.unlock();
+ lock = new ReentrantReadWriteLock();
+ notEmpty = lock.writeLock().newCondition();
}
/**
@@ -105,16 +65,14 @@ class RegionNormalizerWorkQueue<E> {
if (e == null) {
throw new NullPointerException();
}
-
- putLock.lock();
+ lock.writeLock().lock();
try {
delegate.add(e);
+ if (!delegate.isEmpty()) {
+ notEmpty.signal();
+ }
} finally {
- putLock.unlock();
- }
-
- if (!delegate.isEmpty()) {
- signalNotEmpty();
+ lock.writeLock().unlock();
}
}
@@ -138,16 +96,14 @@ class RegionNormalizerWorkQueue<E> {
if (c == null) {
throw new NullPointerException();
}
-
- putLock.lock();
+ lock.writeLock().lock();
try {
delegate.addAll(c);
+ if (!delegate.isEmpty()) {
+ notEmpty.signal();
+ }
} finally {
- putLock.unlock();
- }
-
- if (!delegate.isEmpty()) {
- signalNotEmpty();
+ lock.writeLock().unlock();
}
}
@@ -159,19 +115,17 @@ class RegionNormalizerWorkQueue<E> {
if (c == null) {
throw new NullPointerException();
}
-
- fullyLock();
+ lock.writeLock().lock();
try {
final LinkedHashSet<E> copy = new LinkedHashSet<>(c.size() +
delegate.size());
copy.addAll(c);
copy.addAll(delegate);
delegate = copy;
+ if (!delegate.isEmpty()) {
+ notEmpty.signal();
+ }
} finally {
- fullyUnlock();
- }
-
- if (!delegate.isEmpty()) {
- signalNotEmpty();
+ lock.writeLock().unlock();
}
}
@@ -183,10 +137,13 @@ class RegionNormalizerWorkQueue<E> {
*/
public E take() throws InterruptedException {
E x;
- takeLock.lockInterruptibly();
+ // Take a write lock. If the delegate's queue is empty we need it to
await(), which will
+ // drop the lock, then reacquire it; or if the queue is not empty we will
use an iterator
+ // to mutate the head.
+ lock.writeLock().lockInterruptibly();
try {
while (delegate.isEmpty()) {
- notEmpty.await();
+ notEmpty.await(); // await drops the lock, then reacquires it
}
final Iterator<E> iter = delegate.iterator();
x = iter.next();
@@ -195,7 +152,7 @@ class RegionNormalizerWorkQueue<E> {
notEmpty.signal();
}
} finally {
- takeLock.unlock();
+ lock.writeLock().unlock();
}
return x;
}
@@ -205,11 +162,11 @@ class RegionNormalizerWorkQueue<E> {
* returns.
*/
public void clear() {
- putLock.lock();
+ lock.writeLock().lock();
try {
delegate.clear();
} finally {
- putLock.unlock();
+ lock.writeLock().unlock();
}
}
@@ -218,21 +175,21 @@ class RegionNormalizerWorkQueue<E> {
* @return the number of elements in this queue
*/
public int size() {
- takeLock.lock();
+ lock.readLock().lock();
try {
return delegate.size();
} finally {
- takeLock.unlock();
+ lock.readLock().unlock();
}
}
@Override
public String toString() {
- takeLock.lock();
+ lock.readLock().lock();
try {
return delegate.toString();
} finally {
- takeLock.unlock();
+ lock.readLock().unlock();
}
}
}