ZanderXu commented on code in PR #5371:
URL: https://github.com/apache/hadoop/pull/5371#discussion_r1100919631
##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java:
##########
@@ -1159,29 +1159,35 @@ public void run() {
*/
public void sendRpcRequest(final Call call)
throws InterruptedException, IOException {
- if (shouldCloseConnection.get()) {
- return;
- }
-
- // Serialize the call to be sent. This is done from the actual
- // caller thread, rather than the rpcRequestThread in the connection,
- // so that if the serialization throws an error, it is reported
- // properly. This also parallelizes the serialization.
- //
- // Format of a call on the wire:
- // 0) Length of rest below (1 + 2)
- // 1) RpcRequestHeader - is serialized Delimited hence contains length
- // 2) RpcRequest
- //
- // Items '1' and '2' are prepared here.
- RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader(
- call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id, call.retry,
- clientId, call.alignmentContext);
+ if (!shouldCloseConnection.get()) {
- final ResponseBuffer buf = new ResponseBuffer();
- header.writeDelimitedTo(buf);
- RpcWritable.wrap(call.rpcRequest).writeTo(buf);
- rpcRequestQueue.put(Pair.of(call, buf));
+ // Serialize the call to be sent. This is done from the actual
+ // caller thread, rather than the rpcRequestThread in the connection,
+ // so that if the serialization throws an error, it is reported
+ // properly. This also parallelizes the serialization.
+ //
+ // Format of a call on the wire:
+ // 0) Length of rest below (1 + 2)
+ // 1) RpcRequestHeader - is serialized Delimited hence contains length
+ // 2) RpcRequest
+ //
+ // Items '1' and '2' are prepared here.
+ RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader(
+ call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id, call.retry,
+ clientId, call.alignmentContext);
+
+ final ResponseBuffer buf = new ResponseBuffer();
+ header.writeDelimitedTo(buf);
+ RpcWritable.wrap(call.rpcRequest).writeTo(buf);
+ // Wait for the message to be sent. We offer with timeout to
+ // prevent a race condition between checking the shouldCloseConnection
+ // and the stopping of the polling thread
+ while (!shouldCloseConnection.get()) {
+ if (rpcRequestQueue.offer(Pair.of(call, buf), 1, TimeUnit.SECONDS)) {
+ return;
Review Comment:
`return;` -> `break;`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]