[
https://issues.apache.org/jira/browse/HDFS-16853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17685616#comment-17685616
]
ASF GitHub Bot commented on HDFS-16853:
---------------------------------------
virajjasani commented on code in PR #5366:
URL: https://github.com/apache/hadoop/pull/5366#discussion_r1099397087
##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java:
##########
@@ -1181,9 +1181,69 @@ public void sendRpcRequest(final Call call)
final ResponseBuffer buf = new ResponseBuffer();
header.writeDelimitedTo(buf);
RpcWritable.wrap(call.rpcRequest).writeTo(buf);
- rpcRequestQueue.put(Pair.of(call, buf));
+ queueIfActive(call, buf);
+ }
+
+ /**
+ * Queue an operation into the request queue,
+ * waiting if necessary for the queue to have a thread to process it.
+ * If the connection is closed, downgrades to a no-op
+ * @param call call to queue
+ * @param buf buffer for response
+ * @throws InterruptedException interrupted while waiting for a free
thread.
+ */
+ private void queueIfActive(
+ final Call call,
+ final ResponseBuffer buf) throws InterruptedException {
+ // Get the request queue.
+ // done in a synchronized block to avoid a race condition where
+ // a call is queued after the connection has been closed
+ final SynchronousQueue<Pair<Call, ResponseBuffer>> queue =
+ acquireActiveRequestQueue();
+ if (queue != null) {
+ try {
+ queue.put(Pair.of(call, buf));
+ } finally {
+ // release the reservation afterwards.
+ releaseQueueReservation();
+ }
+ } else {
+ LOG.debug("Discarding queued call as IPC client is stopped");
+ }
+ }
+
+ /**
+ * Get the active rpc request queue.
+ * If the connection is closed, returns null.
+ * This method is synchronized, as are the operations to set
+ * the {@link #shouldCloseConnection} and {@link #running}
+ * atomic booleans, therefore this entire method will complete in the
+ * same block. However, the returned queue may be used outside of
+ * a synchronous block, where this guarantee no longer holds.
+ * A queue reservation counter is used to track this.
+ * Callers MUST invoke {@link #releaseQueueReservation()} afterwards.
+ * @return the queue or null.
+ */
+ private synchronized SynchronousQueue<Pair<Call, ResponseBuffer>>
acquireActiveRequestQueue() {
+ if (shouldCloseConnection.get() || !running.get()) {
+ LOG.debug("IPC client is stopped");
Review Comment:
nit: `LOG.debug("IPC client {} is stopped", this)` would be great
##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java:
##########
@@ -1241,9 +1301,28 @@ private void receiveRpcResponse() {
markClosed(e);
}
}
-
+
+ /**
+ * Mark the connection as closed due to an exception.
+ * Sets the {@link #shouldCloseConnection} boolean to true,
+ * and, if it was false earlier, drains the queue before
+ * notifying any waiting objects.
+ * @param e exception which triggered the closure; may be null.
+ */
private synchronized void markClosed(IOException e) {
if (shouldCloseConnection.compareAndSet(false, true)) {
+ Pair<Call, ResponseBuffer> request;
+ while ((request = rpcRequestQueue.poll()) != null) {
+ LOG.debug("Clean {} from RpcRequestQueue.", request.getLeft());
+ }
+ if (queueReservations.get() > 0) {
+ // there's still an active reservation.
+ // either a new put() is about to happen (bad),
+ // or it has happened but the finally {} clause has not been invoked
(good).
+ // without knowing which, print a warning message so at least logs on
+ // a deadlock are meaningful.
+ LOG.warn("Possible overlap in queue shutdown and request");
Review Comment:
We can also print `queueReservations.get()` with this log
##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java:
##########
@@ -1181,9 +1181,69 @@ public void sendRpcRequest(final Call call)
final ResponseBuffer buf = new ResponseBuffer();
header.writeDelimitedTo(buf);
RpcWritable.wrap(call.rpcRequest).writeTo(buf);
- rpcRequestQueue.put(Pair.of(call, buf));
+ queueIfActive(call, buf);
+ }
+
+ /**
+ * Queue an operation into the request queue,
+ * waiting if necessary for the queue to have a thread to process it.
+ * If the connection is closed, downgrades to a no-op
+ * @param call call to queue
+ * @param buf buffer for response
+ * @throws InterruptedException interrupted while waiting for a free
thread.
+ */
+ private void queueIfActive(
+ final Call call,
+ final ResponseBuffer buf) throws InterruptedException {
+ // Get the request queue.
+ // done in a synchronized block to avoid a race condition where
+ // a call is queued after the connection has been closed
+ final SynchronousQueue<Pair<Call, ResponseBuffer>> queue =
+ acquireActiveRequestQueue();
+ if (queue != null) {
+ try {
+ queue.put(Pair.of(call, buf));
+ } finally {
+ // release the reservation afterwards.
+ releaseQueueReservation();
+ }
+ } else {
+ LOG.debug("Discarding queued call as IPC client is stopped");
+ }
+ }
+
+ /**
+ * Get the active rpc request queue.
+ * If the connection is closed, returns null.
+ * This method is synchronized, as are the operations to set
+ * the {@link #shouldCloseConnection} and {@link #running}
+ * atomic booleans, therefore this entire method will complete in the
+ * same block. However, the returned queue may be used outside of
+ * a synchronous block, where this guarantee no longer holds.
+ * A queue reservation counter is used to track this.
+ * Callers MUST invoke {@link #releaseQueueReservation()} afterwards.
+ * @return the queue or null.
+ */
+ private synchronized SynchronousQueue<Pair<Call, ResponseBuffer>>
acquireActiveRequestQueue() {
+ if (shouldCloseConnection.get() || !running.get()) {
Review Comment:
We do not need additional synchronization on `putLock` object while
accessing `running` correct?
> The UT TestLeaseRecovery2#testHardLeaseRecoveryAfterNameNodeRestart failed
> because HADOOP-18324
> -----------------------------------------------------------------------------------------------
>
> Key: HDFS-16853
> URL: https://issues.apache.org/jira/browse/HDFS-16853
> Project: Hadoop HDFS
> Issue Type: Bug
> Affects Versions: 3.3.5
> Reporter: ZanderXu
> Assignee: ZanderXu
> Priority: Blocker
> Labels: pull-request-available
>
> The UT TestLeaseRecovery2#testHardLeaseRecoveryAfterNameNodeRestart failed
> with error message: Waiting for cluster to become active. And the blocking
> jstack as bellows:
> {code:java}
> "BP-1618793397-192.168.3.4-1669198559828 heartbeating to
> localhost/127.0.0.1:54673" #260 daemon prio=5 os_prio=31 tid=0x
> 00007fc1108fa000 nid=0x19303 waiting on condition [0x0000700017884000]
> java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0x00000007430a9ec0> (a
> java.util.concurrent.SynchronousQueue$TransferQueue)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at
> java.util.concurrent.SynchronousQueue$TransferQueue.awaitFulfill(SynchronousQueue.java:762)
> at
> java.util.concurrent.SynchronousQueue$TransferQueue.transfer(SynchronousQueue.java:695)
> at
> java.util.concurrent.SynchronousQueue.put(SynchronousQueue.java:877)
> at
> org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:1186)
> at org.apache.hadoop.ipc.Client.call(Client.java:1482)
> at org.apache.hadoop.ipc.Client.call(Client.java:1429)
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine2$Invoker.invoke(ProtobufRpcEngine2.java:258)
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine2$Invoker.invoke(ProtobufRpcEngine2.java:139)
> at com.sun.proxy.$Proxy23.sendHeartbeat(Unknown Source)
> at
> org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB.sendHeartbeat(DatanodeProtocolClient
> SideTranslatorPB.java:168)
> at
> org.apache.hadoop.hdfs.server.datanode.BPServiceActor.sendHeartBeat(BPServiceActor.java:570)
> at
> org.apache.hadoop.hdfs.server.datanode.BPServiceActor.offerService(BPServiceActor.java:714)
> at
> org.apache.hadoop.hdfs.server.datanode.BPServiceActor.run(BPServiceActor.java:915)
> at java.lang.Thread.run(Thread.java:748) {code}
> After looking into the code and found that this bug is imported by
> HADOOP-18324. Because RpcRequestSender exited without cleaning up the
> rpcRequestQueue, then caused BPServiceActor was blocked in sending request.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]