This is an automated email from the ASF dual-hosted git repository.

apurtell pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.5 by this push:
     new 2a59ebffaa HBASE-27064 CME in TestRegionNormalizerWorkQueue (#4468)
2a59ebffaa is described below

commit 2a59ebffaa7ec308401d1f850ac06b59d0ab5bd9
Author: Andrew Purtell <[email protected]>
AuthorDate: Tue May 31 11:33:06 2022 -0700

    HBASE-27064 CME in TestRegionNormalizerWorkQueue (#4468)
    
    Signed-off-by: Viraj Jasani <[email protected]>
---
 .../normalizer/RegionNormalizerWorkQueue.java      | 107 ++++++---------------
 1 file changed, 32 insertions(+), 75 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerWorkQueue.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerWorkQueue.java
index c1cab5f97b..f8c969a9f6 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerWorkQueue.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerWorkQueue.java
@@ -25,7 +25,7 @@ import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.yetus.audience.InterfaceAudience;
 
 /**
@@ -39,8 +39,6 @@ import org.apache.yetus.audience.InterfaceAudience;
  * {@link BlockingQueue}.</li>
  * <li>Allows a producer to insert an item at the head of the queue, if 
desired.</li>
  * </ul>
- * Assumes low-frequency and low-parallelism concurrent access, so protects 
state using a simplistic
- * synchronization strategy.
  */
 @InterfaceAudience.Private
 class RegionNormalizerWorkQueue<E> {
@@ -48,53 +46,15 @@ class RegionNormalizerWorkQueue<E> {
   /** Underlying storage structure that gives us the Set behavior and FIFO 
retrieval policy. */
   private LinkedHashSet<E> delegate;
 
-  // the locking structure used here follows the example found in 
LinkedBlockingQueue. The
-  // difference is that our locks guard access to `delegate` rather than the 
head node.
-
-  /** Lock held by take, poll, etc */
-  private final ReentrantLock takeLock;
-
+  /** Lock for puts and takes **/
+  private final ReentrantReadWriteLock lock;
   /** Wait queue for waiting takes */
   private final Condition notEmpty;
 
-  /** Lock held by put, offer, etc */
-  private final ReentrantLock putLock;
-
   RegionNormalizerWorkQueue() {
     delegate = new LinkedHashSet<>();
-    takeLock = new ReentrantLock();
-    notEmpty = takeLock.newCondition();
-    putLock = new ReentrantLock();
-  }
-
-  /**
-   * Signals a waiting take. Called only from put/offer (which do not 
otherwise ordinarily lock
-   * takeLock.)
-   */
-  private void signalNotEmpty() {
-    final ReentrantLock takeLock = this.takeLock;
-    takeLock.lock();
-    try {
-      notEmpty.signal();
-    } finally {
-      takeLock.unlock();
-    }
-  }
-
-  /**
-   * Locks to prevent both puts and takes.
-   */
-  private void fullyLock() {
-    putLock.lock();
-    takeLock.lock();
-  }
-
-  /**
-   * Unlocks to allow both puts and takes.
-   */
-  private void fullyUnlock() {
-    takeLock.unlock();
-    putLock.unlock();
+    lock = new ReentrantReadWriteLock();
+    notEmpty = lock.writeLock().newCondition();
   }
 
   /**
@@ -105,16 +65,14 @@ class RegionNormalizerWorkQueue<E> {
     if (e == null) {
       throw new NullPointerException();
     }
-
-    putLock.lock();
+    lock.writeLock().lock();
     try {
       delegate.add(e);
+      if (!delegate.isEmpty()) {
+        notEmpty.signal();
+      }
     } finally {
-      putLock.unlock();
-    }
-
-    if (!delegate.isEmpty()) {
-      signalNotEmpty();
+      lock.writeLock().unlock();
     }
   }
 
@@ -138,16 +96,14 @@ class RegionNormalizerWorkQueue<E> {
     if (c == null) {
       throw new NullPointerException();
     }
-
-    putLock.lock();
+    lock.writeLock().lock();
     try {
       delegate.addAll(c);
+      if (!delegate.isEmpty()) {
+        notEmpty.signal();
+      }
     } finally {
-      putLock.unlock();
-    }
-
-    if (!delegate.isEmpty()) {
-      signalNotEmpty();
+      lock.writeLock().unlock();
     }
   }
 
@@ -159,19 +115,17 @@ class RegionNormalizerWorkQueue<E> {
     if (c == null) {
       throw new NullPointerException();
     }
-
-    fullyLock();
+    lock.writeLock().lock();
     try {
       final LinkedHashSet<E> copy = new LinkedHashSet<>(c.size() + 
delegate.size());
       copy.addAll(c);
       copy.addAll(delegate);
       delegate = copy;
+      if (!delegate.isEmpty()) {
+        notEmpty.signal();
+      }
     } finally {
-      fullyUnlock();
-    }
-
-    if (!delegate.isEmpty()) {
-      signalNotEmpty();
+      lock.writeLock().unlock();
     }
   }
 
@@ -183,10 +137,13 @@ class RegionNormalizerWorkQueue<E> {
    */
   public E take() throws InterruptedException {
     E x;
-    takeLock.lockInterruptibly();
+    // Take a write lock. If the delegate's queue is empty we need it to 
await(), which will
+    // drop the lock, then reacquire it; or if the queue is not empty we will 
use an iterator
+    // to mutate the head.
+    lock.writeLock().lockInterruptibly();
     try {
       while (delegate.isEmpty()) {
-        notEmpty.await();
+        notEmpty.await(); // await drops the lock, then reacquires it
       }
       final Iterator<E> iter = delegate.iterator();
       x = iter.next();
@@ -195,7 +152,7 @@ class RegionNormalizerWorkQueue<E> {
         notEmpty.signal();
       }
     } finally {
-      takeLock.unlock();
+      lock.writeLock().unlock();
     }
     return x;
   }
@@ -205,11 +162,11 @@ class RegionNormalizerWorkQueue<E> {
    * returns.
    */
   public void clear() {
-    putLock.lock();
+    lock.writeLock().lock();
     try {
       delegate.clear();
     } finally {
-      putLock.unlock();
+      lock.writeLock().unlock();
     }
   }
 
@@ -218,21 +175,21 @@ class RegionNormalizerWorkQueue<E> {
    * @return the number of elements in this queue
    */
   public int size() {
-    takeLock.lock();
+    lock.readLock().lock();
     try {
       return delegate.size();
     } finally {
-      takeLock.unlock();
+      lock.readLock().unlock();
     }
   }
 
   @Override
   public String toString() {
-    takeLock.lock();
+    lock.readLock().lock();
     try {
       return delegate.toString();
     } finally {
-      takeLock.unlock();
+      lock.readLock().unlock();
     }
   }
 }

Reply via email to