[
https://issues.apache.org/jira/browse/HADOOP-15720?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17513673#comment-17513673
]
Wenzhe Zhou edited comment on HADOOP-15720 at 3/28/22, 10:01 PM:
-----------------------------------------------------------------
Hi Hadoop team, does anyone is actively working on this issue? Any ETA? Is
there a work around?
was (Author: wzhou):
Hi Hadoop team, does anyone is actively working on this issue? Any ETA?
> rpcTimeout may not have been applied correctly
> ----------------------------------------------
>
> Key: HADOOP-15720
> URL: https://issues.apache.org/jira/browse/HADOOP-15720
> Project: Hadoop Common
> Issue Type: Bug
> Components: common
> Reporter: Yongjun Zhang
> Priority: Major
>
> org.apache.hadoop.ipc.Client send multiple RPC calls to server synchronously
> via the same connection as in the following synchronized code block:
> {code:java}
> synchronized (sendRpcRequestLock) {
> Future<?> senderFuture = sendParamsExecutor.submit(new Runnable() {
> @Override
> public void run() {
> try {
> synchronized (Connection.this.out) {
> if (shouldCloseConnection.get()) {
> return;
> }
>
> if (LOG.isDebugEnabled()) {
> LOG.debug(getName() + " sending #" + call.id
> + " " + call.rpcRequest);
> }
>
> byte[] data = d.getData();
> int totalLength = d.getLength();
> out.writeInt(totalLength); // Total Length
> out.write(data, 0, totalLength);// RpcRequestHeader +
> RpcRequest
> out.flush();
> }
> } catch (IOException e) {
> // exception at this point would leave the connection in an
> // unrecoverable state (eg half a call left on the wire).
> // So, close the connection, killing any outstanding calls
> markClosed(e);
> } finally {
> //the buffer is just an in-memory buffer, but it is still
> polite to
> // close early
> IOUtils.closeStream(d);
> }
> }
> });
>
> try {
> senderFuture.get();
> } catch (ExecutionException e) {
> Throwable cause = e.getCause();
>
> // cause should only be a RuntimeException as the Runnable above
> // catches IOException
> if (cause instanceof RuntimeException) {
> throw (RuntimeException) cause;
> } else {
> throw new RuntimeException("unexpected checked exception", cause);
> }
> }
> }
> {code}
> And it then waits for result asynchronously via
> {code:java}
> /* Receive a response.
> * Because only one receiver, so no synchronization on in.
> */
> private void receiveRpcResponse() {
> if (shouldCloseConnection.get()) {
> return;
> }
> touch();
>
> try {
> int totalLen = in.readInt();
> RpcResponseHeaderProto header =
> RpcResponseHeaderProto.parseDelimitedFrom(in);
> checkResponse(header);
> int headerLen = header.getSerializedSize();
> headerLen += CodedOutputStream.computeRawVarint32Size(headerLen);
> int callId = header.getCallId();
> if (LOG.isDebugEnabled())
> LOG.debug(getName() + " got value #" + callId);
> Call call = calls.get(callId);
> RpcStatusProto status = header.getStatus();
> ......
> {code}
> However, we can see that the {{call}} returned by {{receiveRpcResonse()}}
> above may be in any order.
> The following code
> {code:java}
> int totalLen = in.readInt();
> {code}
> eventually calls one of the following two methods, where rpcTimeOut is
> checked against:
> {code:java}
> /** Read a byte from the stream.
> * Send a ping if timeout on read. Retries if no failure is detected
> * until a byte is read.
> * @throws IOException for any IO problem other than socket timeout
> */
> @Override
> public int read() throws IOException {
> int waiting = 0;
> do {
> try {
> return super.read();
> } catch (SocketTimeoutException e) {
> waiting += soTimeout;
> handleTimeout(e, waiting);
> }
> } while (true);
> }
> /** Read bytes into a buffer starting from offset <code>off</code>
> * Send a ping if timeout on read. Retries if no failure is detected
> * until a byte is read.
> *
> * @return the total number of bytes read; -1 if the connection is
> closed.
> */
> @Override
> public int read(byte[] buf, int off, int len) throws IOException {
> int waiting = 0;
> do {
> try {
> return super.read(buf, off, len);
> } catch (SocketTimeoutException e) {
> waiting += soTimeout;
> handleTimeout(e, waiting);
> }
> } while (true);
> }
> {code}
> But the waiting time is always initialized to 0 for each of the above read
> calls, so each call can take up to rpcTimeout. And the real time to time out
> a call appears to be accumulative.
> For example, if the client issue call1, call2, then it waits for result, if
> the first call1 took (rpcTimeout - 1), thus no time out, the second took
> (rpcTimeout -1), thus no timeout, but it effectively took 2*(rpcTimeout -1)
> which could be much bigger than rpcTimeout and should time out.
> Worst case is that a RPC may take indeterminatey long and doesn't time out.
> It seems more accurate to remember the time that an RPC is sent to the
> server, and then check time out here:
> {code:java}
> public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
> ConnectionId remoteId, int serviceClass,
> AtomicBoolean fallbackToSimpleAuth) throws IOException {
> final Call call = createCall(rpcKind, rpcRequest);
> Connection connection = getConnection(remoteId, call, serviceClass,
> fallbackToSimpleAuth);
> try {
> connection.sendRpcRequest(call); // send the rpc request
> } catch (RejectedExecutionException e) {
> throw new IOException("connection has been closed", e);
> } catch (InterruptedException e) {
> Thread.currentThread().interrupt();
> LOG.warn("interrupted waiting to send rpc request to server", e);
> throw new IOException(e);
> }
> synchronized (call) {
> while (!call.done) {
> try {
> call.wait(); // wait for the result
> } catch (InterruptedException ie) {
> Thread.currentThread().interrupt();
> throw new InterruptedIOException("Call interrupted");
> } <=should check how long it has waited here, time out if rpcTimeout
> has been reached
> }
> if (call.error != null) {
> if (call.error instanceof RemoteException) {
> call.error.fillInStackTrace();
> throw call.error;
> } else { // local exception
> InetSocketAddress address = connection.getRemoteAddress();
> throw NetUtils.wrapException(address.getHostName(),
> address.getPort(),
> NetUtils.getHostname(),
> 0,
> call.error);
> }
> } else {
> return call.getRpcResponse();
> }
> }
> }
> {code}
> basically we should change the call highlighted above from
> {code:java}
> public final void wait() throws InterruptedException
> {code}
> to
> {code:java}
> public final void wait(long timeout, int nanos) throws InterruptedException
> {code}
> and apply rpcTimeout as the parameter value here (notice that I'm ignoring
> the time needed to send rpc over to the server, and ideally we should include
> that too, so rpcTimeout could mean what it intends to mean).
> Hi [~daryn] and [~kihwal], would you please help taking a look at my above
> analysis to see if I have any misunderstanding here?
> Thanks a lot.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]