omalley commented on code in PR #4527:
URL: https://github.com/apache/hadoop/pull/4527#discussion_r1024653352
##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java:
##########
@@ -1153,9 +1087,51 @@ public void run() {
+ connections.size());
}
+ /**
+ * A thread to write rpc requests to the socket.
+ */
+ private class RpcRequestSender implements Runnable {
+ @Override
+ public void run() {
+ while (!shouldCloseConnection.get()) {
+ ResponseBuffer buf = null;
+ try {
+ Pair<Call, ResponseBuffer> pair = rpcRequestQueue.take();
+ if (shouldCloseConnection.get()) {
+ return;
+ }
+ buf = pair.getRight();
+ synchronized (ipcStreams.out) {
+ if (LOG.isDebugEnabled()) {
+ Call call = pair.getLeft();
+ LOG.debug(getName() + " sending #" + call.id
+ + " " + call.rpcRequest);
+ }
+ // RpcRequestHeader + RpcRequest
+ ipcStreams.sendRequest(buf.toByteArray());
+ ipcStreams.flush();
+ }
+ } catch (InterruptedException ie) {
+ // stop this thread
Review Comment:
I don't think we should, because it will create a race condition about which
thread marks the connection closed.
The only place that thread is interrupted is just after the Connection
thread is interrupted. The Connection thread already calls
Connection.markedClosed(IOException). If we have this thread also try to call
markClosed it will just add non-determinism about which IOException gets logged.
The important part is that when this thread is interrupted that we stop. :)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]