[ https://issues.apache.org/jira/browse/HADOOP-15530?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Yongjun Zhang updated HADOOP-15530: ----------------------------------- Description: In Client.java, sendRpcRequest does the following {code} /** Initiates a rpc call by sending the rpc request to the remote server. * Note: this is not called from the Connection thread, but by other * threads. * @param call - the rpc request */ public void sendRpcRequest(final Call call) throws InterruptedException, IOException { if (shouldCloseConnection.get()) { return; } // Serialize the call to be sent. This is done from the actual // caller thread, rather than the sendParamsExecutor thread, // so that if the serialization throws an error, it is reported // properly. This also parallelizes the serialization. // // Format of a call on the wire: // 0) Length of rest below (1 + 2) // 1) RpcRequestHeader - is serialized Delimited hence contains length // 2) RpcRequest // // Items '1' and '2' are prepared here. RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader( call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id, call.retry, clientId); final ResponseBuffer buf = new ResponseBuffer(); header.writeDelimitedTo(buf); RpcWritable.wrap(call.rpcRequest).writeTo(buf); synchronized (sendRpcRequestLock) { Future<?> senderFuture = sendParamsExecutor.submit(new Runnable() { @Override public void run() { try { synchronized (ipcStreams.out) { if (shouldCloseConnection.get()) { return; } if (LOG.isDebugEnabled()) { LOG.debug(getName() + " sending #" + call.id + " " + call.rpcRequest); } // RpcRequestHeader + RpcRequest ipcStreams.sendRequest(buf.toByteArray()); ipcStreams.flush(); } } catch (IOException e) { // exception at this point would leave the connection in an // unrecoverable state (eg half a call left on the wire). // So, close the connection, killing any outstanding calls markClosed(e); } finally { //the buffer is just an in-memory buffer, but it is still polite to // close early IOUtils.closeStream(buf); } } }); try { senderFuture.get(); } catch (ExecutionException e) { Throwable cause = e.getCause(); // cause should only be a RuntimeException as the Runnable above // catches IOException if (cause instanceof RuntimeException) { throw (RuntimeException) cause; } else { throw new RuntimeException("unexpected checked exception", cause); } } } } {code} It's observed that the call can be stuck at {{senderFuture.get();}} with the following stack {code} "Thread-13" #40 prio=5 os_prio=0 tid=0x000000000fb0d000 nid=0xf189c waiting on condition [0x00007f697c582000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00000006187e5ec0> (a java.util.concurrent.FutureTask) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:429) at java.util.concurrent.FutureTask.get(FutureTask.java:191) at org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:1088) - locked <0x00000006215c1e08> (a java.lang.Object) at org.apache.hadoop.ipc.Client.call(Client.java:1483) at org.apache.hadoop.ipc.Client.call(Client.java:1441) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230) at com.sun.proxy.$Proxy10.getBlockLocations(Unknown Source) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:266) at sun.reflect.GeneratedMethodAccessor4.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:258) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104) at com.sun.proxy.$Proxy11.getBlockLocations(Unknown Source) at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1323) at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1310) at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1298) at org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:309) at org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:275) - locked <0x00000006187e5530> (a java.lang.Object) at org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:267) at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1629) at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:338) at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:334) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:334) {code} Given that we support rpcTimeOut, we could chose the second method of Future below: {code} /** * Waits if necessary for the computation to complete, and then * retrieves its result. * * @return the computed result * @throws CancellationException if the computation was cancelled * @throws ExecutionException if the computation threw an * exception * @throws InterruptedException if the current thread was interrupted * while waiting */ V get() throws InterruptedException, ExecutionException; /** * Waits if necessary for at most the given time for the computation * to complete, and then retrieves its result, if available. * * @param timeout the maximum time to wait * @param unit the time unit of the timeout argument * @return the computed result * @throws CancellationException if the computation was cancelled * @throws ExecutionException if the computation threw an * exception * @throws InterruptedException if the current thread was interrupted * while waiting * @throws TimeoutException if the wait timed out */ V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; {code} In theory, since the RPC at client is serialized, we could just use the main thread to do the execution, instead of using a threadpool to create new thread. This can be discussed in a separate jira. And why the RPC is not processed and returned by NN is another topic (HADOOP-15538). was: In Client.java, sendRpcRequest does the following {code} /** Initiates a rpc call by sending the rpc request to the remote server. * Note: this is not called from the Connection thread, but by other * threads. * @param call - the rpc request */ public void sendRpcRequest(final Call call) throws InterruptedException, IOException { if (shouldCloseConnection.get()) { return; } // Serialize the call to be sent. This is done from the actual // caller thread, rather than the sendParamsExecutor thread, // so that if the serialization throws an error, it is reported // properly. This also parallelizes the serialization. // // Format of a call on the wire: // 0) Length of rest below (1 + 2) // 1) RpcRequestHeader - is serialized Delimited hence contains length // 2) RpcRequest // // Items '1' and '2' are prepared here. RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader( call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id, call.retry, clientId); final ResponseBuffer buf = new ResponseBuffer(); header.writeDelimitedTo(buf); RpcWritable.wrap(call.rpcRequest).writeTo(buf); synchronized (sendRpcRequestLock) { Future<?> senderFuture = sendParamsExecutor.submit(new Runnable() { @Override public void run() { try { synchronized (ipcStreams.out) { if (shouldCloseConnection.get()) { return; } if (LOG.isDebugEnabled()) { LOG.debug(getName() + " sending #" + call.id + " " + call.rpcRequest); } // RpcRequestHeader + RpcRequest ipcStreams.sendRequest(buf.toByteArray()); ipcStreams.flush(); } } catch (IOException e) { // exception at this point would leave the connection in an // unrecoverable state (eg half a call left on the wire). // So, close the connection, killing any outstanding calls markClosed(e); } finally { //the buffer is just an in-memory buffer, but it is still polite to // close early IOUtils.closeStream(buf); } } }); try { senderFuture.get(); } catch (ExecutionException e) { Throwable cause = e.getCause(); // cause should only be a RuntimeException as the Runnable above // catches IOException if (cause instanceof RuntimeException) { throw (RuntimeException) cause; } else { throw new RuntimeException("unexpected checked exception", cause); } } } } {code} It's observed that the call can be stuck at {{senderFuture.get();}} Given that we support rpcTimeOut, we could chose the second method of Future below: {code} /** * Waits if necessary for the computation to complete, and then * retrieves its result. * * @return the computed result * @throws CancellationException if the computation was cancelled * @throws ExecutionException if the computation threw an * exception * @throws InterruptedException if the current thread was interrupted * while waiting */ V get() throws InterruptedException, ExecutionException; /** * Waits if necessary for at most the given time for the computation * to complete, and then retrieves its result, if available. * * @param timeout the maximum time to wait * @param unit the time unit of the timeout argument * @return the computed result * @throws CancellationException if the computation was cancelled * @throws ExecutionException if the computation threw an * exception * @throws InterruptedException if the current thread was interrupted * while waiting * @throws TimeoutException if the wait timed out */ V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; {code} In theory, since the RPC at client is serialized, we could just use the main thread to do the execution, instead of using a threadpool to create new thread. This can be discussed in a separate jira. And why the RPC is not processed and returned by NN is another topic (HADOOP-15538). > RPC could stuck at senderFuture.get() > ------------------------------------- > > Key: HADOOP-15530 > URL: https://issues.apache.org/jira/browse/HADOOP-15530 > Project: Hadoop Common > Issue Type: Bug > Components: common > Reporter: Yongjun Zhang > Assignee: Yongjun Zhang > Priority: Major > > In Client.java, sendRpcRequest does the following > {code} > /** Initiates a rpc call by sending the rpc request to the remote server. > * Note: this is not called from the Connection thread, but by other > * threads. > * @param call - the rpc request > */ > public void sendRpcRequest(final Call call) > throws InterruptedException, IOException { > if (shouldCloseConnection.get()) { > return; > } > // Serialize the call to be sent. This is done from the actual > // caller thread, rather than the sendParamsExecutor thread, > // so that if the serialization throws an error, it is reported > // properly. This also parallelizes the serialization. > // > // Format of a call on the wire: > // 0) Length of rest below (1 + 2) > // 1) RpcRequestHeader - is serialized Delimited hence contains length > // 2) RpcRequest > // > // Items '1' and '2' are prepared here. > RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader( > call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id, call.retry, > clientId); > final ResponseBuffer buf = new ResponseBuffer(); > header.writeDelimitedTo(buf); > RpcWritable.wrap(call.rpcRequest).writeTo(buf); > synchronized (sendRpcRequestLock) { > Future<?> senderFuture = sendParamsExecutor.submit(new Runnable() { > @Override > public void run() { > try { > synchronized (ipcStreams.out) { > if (shouldCloseConnection.get()) { > return; > } > if (LOG.isDebugEnabled()) { > LOG.debug(getName() + " sending #" + call.id > + " " + call.rpcRequest); > } > // RpcRequestHeader + RpcRequest > ipcStreams.sendRequest(buf.toByteArray()); > ipcStreams.flush(); > } > } catch (IOException e) { > // exception at this point would leave the connection in an > // unrecoverable state (eg half a call left on the wire). > // So, close the connection, killing any outstanding calls > markClosed(e); > } finally { > //the buffer is just an in-memory buffer, but it is still > polite to > // close early > IOUtils.closeStream(buf); > } > } > }); > try { > senderFuture.get(); > } catch (ExecutionException e) { > Throwable cause = e.getCause(); > // cause should only be a RuntimeException as the Runnable above > // catches IOException > if (cause instanceof RuntimeException) { > throw (RuntimeException) cause; > } else { > throw new RuntimeException("unexpected checked exception", cause); > } > } > } > } > {code} > It's observed that the call can be stuck at {{senderFuture.get();}} with the > following stack > {code} > "Thread-13" #40 prio=5 os_prio=0 tid=0x000000000fb0d000 nid=0xf189c waiting > on condition [0x00007f697c582000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00000006187e5ec0> (a > java.util.concurrent.FutureTask) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:429) > at java.util.concurrent.FutureTask.get(FutureTask.java:191) > at > org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:1088) > - locked <0x00000006215c1e08> (a java.lang.Object) > at org.apache.hadoop.ipc.Client.call(Client.java:1483) > at org.apache.hadoop.ipc.Client.call(Client.java:1441) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230) > at com.sun.proxy.$Proxy10.getBlockLocations(Unknown Source) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:266) > at sun.reflect.GeneratedMethodAccessor4.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:258) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104) > at com.sun.proxy.$Proxy11.getBlockLocations(Unknown Source) > at > org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1323) > at > org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1310) > at > org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1298) > at > org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:309) > at > org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:275) > - locked <0x00000006187e5530> (a java.lang.Object) > at > org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:267) > at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1629) > at > org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:338) > at > org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:334) > at > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) > at > org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:334) > {code} > Given that we support rpcTimeOut, we could chose the second method of Future > below: > {code} > /** > * Waits if necessary for the computation to complete, and then > * retrieves its result. > * > * @return the computed result > * @throws CancellationException if the computation was cancelled > * @throws ExecutionException if the computation threw an > * exception > * @throws InterruptedException if the current thread was interrupted > * while waiting > */ > V get() throws InterruptedException, ExecutionException; > /** > * Waits if necessary for at most the given time for the computation > * to complete, and then retrieves its result, if available. > * > * @param timeout the maximum time to wait > * @param unit the time unit of the timeout argument > * @return the computed result > * @throws CancellationException if the computation was cancelled > * @throws ExecutionException if the computation threw an > * exception > * @throws InterruptedException if the current thread was interrupted > * while waiting > * @throws TimeoutException if the wait timed out > */ > V get(long timeout, TimeUnit unit) > throws InterruptedException, ExecutionException, TimeoutException; > {code} > In theory, since the RPC at client is serialized, we could just use the main > thread to do the execution, instead of using a threadpool to create new > thread. This can be discussed in a separate jira. > And why the RPC is not processed and returned by NN is another topic > (HADOOP-15538). > -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-issues-h...@hadoop.apache.org