This is an automated email from the ASF dual-hosted git repository.

bbeaudreault pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 6b6902d2b9a HBASE-27768 Race conditions in BlockingRpcConnection 
(#5154)
6b6902d2b9a is described below

commit 6b6902d2b9a54ad4a194fce701d4aa066301a868
Author: Bryan Beaudreault <[email protected]>
AuthorDate: Mon Apr 10 15:23:03 2023 -0400

    HBASE-27768 Race conditions in BlockingRpcConnection (#5154)
    
    Signed-off-by: Duo Zhang <[email protected]>
    Signed-off-by: Xiaolin Ha <[email protected]>
---
 .../hadoop/hbase/ipc/BlockingRpcConnection.java    | 82 +++++++++++++++++++---
 1 file changed, 74 insertions(+), 8 deletions(-)

diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java
index 60e2502524e..d789417aef7 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java
@@ -42,6 +42,7 @@ import java.util.Queue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
 import javax.security.sasl.SaslException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.CellScanner;
@@ -98,6 +99,13 @@ class BlockingRpcConnection extends RpcConnection implements 
Runnable {
       justification = "We are always under lock actually")
   private Thread thread;
 
+  // Used for ensuring two reader threads don't run over each other. Should 
only be used
+  // in reader thread run() method, to avoid deadlocks with synchronization on 
BlockingRpcConnection
+  private final Object readerThreadLock = new Object();
+
+  // Used to suffix the threadName in a way that we can differentiate them in 
logs/thread dumps.
+  private final AtomicInteger attempts = new AtomicInteger();
+
   // connected socket. protected for writing UT.
   protected Socket socket = null;
   private DataInputStream in;
@@ -325,6 +333,17 @@ class BlockingRpcConnection extends RpcConnection 
implements Runnable {
       if (thread == null) {
         return false;
       }
+
+      // If closeConn is called while we are in the readResponse method, it's 
possible that a new
+      // call to setupIOStreams comes in and creates a new value for "thread" 
before readResponse
+      // finishes. Once readResponse finishes, it will come in here and thread 
will be non-null
+      // above, but pointing at a new thread. In that case, we should end to 
avoid a situation
+      // where two threads are forever competing for the same socket.
+      if (!isCurrentThreadExpected()) {
+        LOG.debug("Thread replaced by new connection thread. Ending 
waitForWork loop.");
+        return false;
+      }
+
       if (!calls.isEmpty()) {
         return true;
       }
@@ -336,6 +355,25 @@ class BlockingRpcConnection extends RpcConnection 
implements Runnable {
       try {
         wait(Math.min(this.rpcClient.minIdleTimeBeforeClose, 1000));
       } catch (InterruptedException e) {
+        // Restore interrupt status
+        Thread.currentThread().interrupt();
+
+        String msg = "Interrupted while waiting for work";
+
+        // If we were interrupted by closeConn, it would have set thread to 
null.
+        // We are synchronized here and if we somehow got interrupted without 
setting thread to
+        // null, we want to make sure the connection is closed since the read 
thread would be dead.
+        // Rather than do a null check here, we check if the current thread is 
the expected thread.
+        // This guards against the case where a call to setupIOStreams got the 
synchronized lock
+        // first after closeConn, thus changing the thread to a new thread.
+        if (isCurrentThreadExpected()) {
+          LOG.debug(msg + ", closing connection");
+          closeConn(new InterruptedIOException(msg));
+        } else {
+          LOG.debug(msg);
+        }
+
+        return false;
       }
     }
   }
@@ -343,13 +381,24 @@ class BlockingRpcConnection extends RpcConnection 
implements Runnable {
   @Override
   public void run() {
     if (LOG.isTraceEnabled()) {
-      LOG.trace(threadName + ": starting");
+      LOG.trace("starting");
     }
-    while (waitForWork()) {
-      readResponse();
+
+    // We have a synchronization here because it's possible in error scenarios 
for a new
+    // thread to be started while readResponse is still reading on the socket. 
We don't want
+    // two threads to be reading from the same socket/inputstream.
+    // The below calls can synchronize on "BlockingRpcConnection.this".
+    // We should not synchronize on readerThreadLock anywhere else, to avoid 
deadlocks
+    synchronized (readerThreadLock) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("started");
+      }
+      while (waitForWork()) {
+        readResponse();
+      }
     }
     if (LOG.isTraceEnabled()) {
-      LOG.trace(threadName + ": stopped");
+      LOG.trace("stopped");
     }
   }
 
@@ -519,7 +568,7 @@ class BlockingRpcConnection extends RpcConnection 
implements Runnable {
     }
 
     // start the receiver thread after the socket connection has been set up
-    thread = new Thread(this, threadName);
+    thread = new Thread(this, threadName + " (attempt: " + 
attempts.incrementAndGet() + ")");
     thread.setDaemon(true);
     thread.start();
   }
@@ -633,7 +682,7 @@ class BlockingRpcConnection extends RpcConnection 
implements Runnable {
         call.callStats.setRequestSizeBytes(write(this.out, requestHeader, 
call.param, cellBlock));
       } catch (Throwable t) {
         if (LOG.isTraceEnabled()) {
-          LOG.trace("Error while writing {}", call.toShortString());
+          LOG.trace("Error while writing {}", call.toShortString(), t);
         }
         IOException e = IPCUtil.toIOE(t);
         closeConn(e);
@@ -720,16 +769,33 @@ class BlockingRpcConnection extends RpcConnection 
implements Runnable {
         // since we expect certain responses to not make it by the specified
         // {@link ConnectionId#rpcTimeout}.
         if (LOG.isTraceEnabled()) {
-          LOG.trace("ignored", e);
+          LOG.trace("ignored ex for call {}", call, e);
         }
       } else {
         synchronized (this) {
-          closeConn(e);
+          // The exception we received may have been caused by another thread 
closing
+          // this connection. It's possible that before getting to this point, 
a new connection was
+          // created. In that case, it doesn't help and can actually hurt to 
close again here.
+          if (isCurrentThreadExpected()) {
+            LOG.debug("Closing connection after error in call {}", call, e);
+            closeConn(e);
+          }
         }
       }
     }
   }
 
+  /**
+   * For use in the reader thread, tests if the current reader thread is the 
one expected to be
+   * running. When closeConn is called, the reader thread is expected to end. 
setupIOStreams then
+   * creates a new thread and updates the thread pointer. At that point, the 
new thread should be
+   * the only one running. We use this method to guard against cases where the 
old thread may be
+   * erroneously running or closing the connection in error states.
+   */
+  private boolean isCurrentThreadExpected() {
+    return thread == Thread.currentThread();
+  }
+
   @Override
   protected synchronized void callTimeout(Call call) {
     // call sender

Reply via email to