This is an automated email from the ASF dual-hosted git repository.
bbeaudreault pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.4 by this push:
new 6b6902d2b9a HBASE-27768 Race conditions in BlockingRpcConnection
(#5154)
6b6902d2b9a is described below
commit 6b6902d2b9a54ad4a194fce701d4aa066301a868
Author: Bryan Beaudreault <[email protected]>
AuthorDate: Mon Apr 10 15:23:03 2023 -0400
HBASE-27768 Race conditions in BlockingRpcConnection (#5154)
Signed-off-by: Duo Zhang <[email protected]>
Signed-off-by: Xiaolin Ha <[email protected]>
---
.../hadoop/hbase/ipc/BlockingRpcConnection.java | 82 +++++++++++++++++++---
1 file changed, 74 insertions(+), 8 deletions(-)
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java
index 60e2502524e..d789417aef7 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java
@@ -42,6 +42,7 @@ import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.security.sasl.SaslException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellScanner;
@@ -98,6 +99,13 @@ class BlockingRpcConnection extends RpcConnection implements
Runnable {
justification = "We are always under lock actually")
private Thread thread;
+ // Used for ensuring two reader threads don't run over each other. Should
only be used
+ // in reader thread run() method, to avoid deadlocks with synchronization on
BlockingRpcConnection
+ private final Object readerThreadLock = new Object();
+
+ // Used to suffix the threadName in a way that we can differentiate them in
logs/thread dumps.
+ private final AtomicInteger attempts = new AtomicInteger();
+
// connected socket. protected for writing UT.
protected Socket socket = null;
private DataInputStream in;
@@ -325,6 +333,17 @@ class BlockingRpcConnection extends RpcConnection
implements Runnable {
if (thread == null) {
return false;
}
+
+ // If closeConn is called while we are in the readResponse method, it's
possible that a new
+ // call to setupIOStreams comes in and creates a new value for "thread"
before readResponse
+ // finishes. Once readResponse finishes, it will come in here and thread
will be non-null
+ // above, but pointing at a new thread. In that case, we should end to
avoid a situation
+ // where two threads are forever competing for the same socket.
+ if (!isCurrentThreadExpected()) {
+ LOG.debug("Thread replaced by new connection thread. Ending
waitForWork loop.");
+ return false;
+ }
+
if (!calls.isEmpty()) {
return true;
}
@@ -336,6 +355,25 @@ class BlockingRpcConnection extends RpcConnection
implements Runnable {
try {
wait(Math.min(this.rpcClient.minIdleTimeBeforeClose, 1000));
} catch (InterruptedException e) {
+ // Restore interrupt status
+ Thread.currentThread().interrupt();
+
+ String msg = "Interrupted while waiting for work";
+
+ // If we were interrupted by closeConn, it would have set thread to
null.
+ // We are synchronized here and if we somehow got interrupted without
setting thread to
+ // null, we want to make sure the connection is closed since the read
thread would be dead.
+ // Rather than do a null check here, we check if the current thread is
the expected thread.
+ // This guards against the case where a call to setupIOStreams got the
synchronized lock
+ // first after closeConn, thus changing the thread to a new thread.
+ if (isCurrentThreadExpected()) {
+ LOG.debug(msg + ", closing connection");
+ closeConn(new InterruptedIOException(msg));
+ } else {
+ LOG.debug(msg);
+ }
+
+ return false;
}
}
}
@@ -343,13 +381,24 @@ class BlockingRpcConnection extends RpcConnection
implements Runnable {
@Override
public void run() {
if (LOG.isTraceEnabled()) {
- LOG.trace(threadName + ": starting");
+ LOG.trace("starting");
}
- while (waitForWork()) {
- readResponse();
+
+ // We have a synchronization here because it's possible in error scenarios
for a new
+ // thread to be started while readResponse is still reading on the socket.
We don't want
+ // two threads to be reading from the same socket/inputstream.
+ // The below calls can synchronize on "BlockingRpcConnection.this".
+ // We should not synchronize on readerThreadLock anywhere else, to avoid
deadlocks
+ synchronized (readerThreadLock) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("started");
+ }
+ while (waitForWork()) {
+ readResponse();
+ }
}
if (LOG.isTraceEnabled()) {
- LOG.trace(threadName + ": stopped");
+ LOG.trace("stopped");
}
}
@@ -519,7 +568,7 @@ class BlockingRpcConnection extends RpcConnection
implements Runnable {
}
// start the receiver thread after the socket connection has been set up
- thread = new Thread(this, threadName);
+ thread = new Thread(this, threadName + " (attempt: " +
attempts.incrementAndGet() + ")");
thread.setDaemon(true);
thread.start();
}
@@ -633,7 +682,7 @@ class BlockingRpcConnection extends RpcConnection
implements Runnable {
call.callStats.setRequestSizeBytes(write(this.out, requestHeader,
call.param, cellBlock));
} catch (Throwable t) {
if (LOG.isTraceEnabled()) {
- LOG.trace("Error while writing {}", call.toShortString());
+ LOG.trace("Error while writing {}", call.toShortString(), t);
}
IOException e = IPCUtil.toIOE(t);
closeConn(e);
@@ -720,16 +769,33 @@ class BlockingRpcConnection extends RpcConnection
implements Runnable {
// since we expect certain responses to not make it by the specified
// {@link ConnectionId#rpcTimeout}.
if (LOG.isTraceEnabled()) {
- LOG.trace("ignored", e);
+ LOG.trace("ignored ex for call {}", call, e);
}
} else {
synchronized (this) {
- closeConn(e);
+ // The exception we received may have been caused by another thread
closing
+ // this connection. It's possible that before getting to this point,
a new connection was
+ // created. In that case, it doesn't help and can actually hurt to
close again here.
+ if (isCurrentThreadExpected()) {
+ LOG.debug("Closing connection after error in call {}", call, e);
+ closeConn(e);
+ }
}
}
}
}
+ /**
+ * For use in the reader thread, tests if the current reader thread is the
one expected to be
+ * running. When closeConn is called, the reader thread is expected to end.
setupIOStreams then
+ * creates a new thread and updates the thread pointer. At that point, the
new thread should be
+ * the only one running. We use this method to guard against cases where the
old thread may be
+ * erroneously running or closing the connection in error states.
+ */
+ private boolean isCurrentThreadExpected() {
+ return thread == Thread.currentThread();
+ }
+
@Override
protected synchronized void callTimeout(Call call) {
// call sender