Repository: hbase Updated Branches: refs/heads/master f20fac41d -> 9f4b6ac06
HBASE-11835 Wrong managenement of non expected calls in the client (Nicolas Liochon) Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/9f4b6ac0 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/9f4b6ac0 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/9f4b6ac0 Branch: refs/heads/master Commit: 9f4b6ac06c35fb18acd1951382da024780e01e2f Parents: f20fac4 Author: stack <[email protected]> Authored: Thu Oct 30 12:41:54 2014 -0700 Committer: stack <[email protected]> Committed: Thu Oct 30 12:41:54 2014 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hbase/ipc/RpcClient.java | 20 +++++----- .../org/apache/hadoop/hbase/ipc/RpcServer.java | 42 ++++++++++++-------- 2 files changed, 37 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/9f4b6ac0/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java index 225a2c9..4586e3e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java @@ -338,7 +338,8 @@ public class RpcClient { protected final static Map<AuthenticationProtos.TokenIdentifier.Kind, TokenSelector<? extends TokenIdentifier>> tokenHandlers = - new HashMap<AuthenticationProtos.TokenIdentifier.Kind, TokenSelector<? extends TokenIdentifier>>(); + new HashMap<AuthenticationProtos.TokenIdentifier.Kind, + TokenSelector<? extends TokenIdentifier>>(); static { tokenHandlers.put(AuthenticationProtos.TokenIdentifier.Kind.HBASE_AUTH_TOKEN, new AuthenticationTokenSelector()); @@ -652,18 +653,21 @@ public class RpcClient { socket.getOutputStream().close(); } } catch (IOException ignored) { // Can happen if the socket is already closed + if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored); } try { if (socket.getInputStream() != null) { socket.getInputStream().close(); } } catch (IOException ignored) { // Can happen if the socket is already closed + if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored); } try { if (socket.getChannel() != null) { socket.getChannel().close(); } } catch (IOException ignored) { // Can happen if the socket is already closed + if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored); } try { socket.close(); @@ -1161,18 +1165,18 @@ public class RpcClient { int readSoFar = IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader); int whatIsLeftToRead = totalSize - readSoFar; IOUtils.skipFully(in, whatIsLeftToRead); + return; } if (responseHeader.hasException()) { ExceptionResponse exceptionResponse = responseHeader.getException(); RemoteException re = createRemoteException(exceptionResponse); - if (expectedCall) call.setException(re); + call.setException(re); if (isFatalConnectionException(exceptionResponse)) { markClosed(re); } } else { Message value = null; - // Call may be null because it may have timeout and been cleaned up on this side already - if (expectedCall && call.responseDefaultType != null) { + if (call.responseDefaultType != null) { Builder builder = call.responseDefaultType.newBuilderForType(); builder.mergeDelimitedFrom(in); value = builder.build(); @@ -1184,9 +1188,7 @@ public class RpcClient { IOUtils.readFully(this.in, cellBlock, 0, cellBlock.length); cellBlockScanner = ipcUtil.createCellScanner(this.codec, this.compressor, cellBlock); } - // it's possible that this call may have been cleaned up due to a RPC - // timeout, so check if it still exists before setting the value. - if (expectedCall) call.setResponse(value, cellBlockScanner); + call.setResponse(value, cellBlockScanner); } } catch (IOException e) { if (expectedCall) call.setException(e); @@ -1194,6 +1196,7 @@ public class RpcClient { // Clean up open calls but don't treat this as a fatal condition, // since we expect certain responses to not make it by the specified // {@link ConnectionId#rpcTimeout}. + if (LOG.isTraceEnabled()) LOG.trace("ignored", e); } else { // Treat this as a fatal condition and close this connection markClosed(e); @@ -1489,7 +1492,6 @@ public class RpcClient { @Override public void run(Object parameter) { connection.callSender.remove(cts); - call.callComplete(); } }); if (pcrc.isCanceled()) { @@ -1662,7 +1664,7 @@ public class RpcClient { public int hashCode() { int hashcode = (address.hashCode() + PRIME * (PRIME * this.serviceName.hashCode() ^ - (ticket == null ? 0 : ticket.hashCode()) )); + (ticket == null ? 0 : ticket.hashCode()))); return hashcode; } } http://git-wip-us.apache.org/repos/asf/hbase/blob/9f4b6ac0/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 1799d80..4c72714 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -685,6 +685,7 @@ public class RpcServer implements RpcServerInterface { doAccept(key); } } catch (IOException ignored) { + if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored); } key = null; } @@ -722,7 +723,9 @@ public class RpcServer implements RpcServerInterface { try { acceptChannel.close(); selector.close(); - } catch (IOException ignored) { } + } catch (IOException ignored) { + if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored); + } selector= null; acceptChannel= null; @@ -870,22 +873,28 @@ public class RpcServer implements RpcServerInterface { /** * Take the list of the connections that want to write, and register them - * in the selector. + * in the selector. */ - private void registerWrites(){ + private void registerWrites() { Iterator<Connection> it = writingCons.iterator(); - while (it.hasNext()){ + while (it.hasNext()) { Connection c = it.next(); it.remove(); SelectionKey sk = c.channel.keyFor(writeSelector); - if (sk == null){ - try { - c.channel.register(writeSelector, SelectionKey.OP_WRITE, c); - } catch (ClosedChannelException e) { - // ignore: the client went away. + try { + if (sk == null) { + try { + c.channel.register(writeSelector, SelectionKey.OP_WRITE, c); + } catch (ClosedChannelException e) { + // ignore: the client went away. + if (LOG.isTraceEnabled()) LOG.trace("ignored", e); + } + } else { + sk.interestOps(SelectionKey.OP_WRITE); } - } else { - sk.interestOps(SelectionKey.OP_WRITE); + } catch (CancelledKeyException e) { + // ignore: the client went away. + if (LOG.isTraceEnabled()) LOG.trace("ignored", e); } } } @@ -1048,15 +1057,17 @@ public class RpcServer implements RpcServerInterface { /** * Process all the responses for this connection * - * @return true if all the calls were processed or that someone else is doing it. false if there - * is still some work to do. In this case, we expect the caller to delay us. + * @return true if all the calls were processed or that someone else is doing it. + * false if there * is still some work to do. In this case, we expect the caller to + * delay us. * @throws IOException */ private boolean processAllResponses(final Connection connection) throws IOException { // We want only one writer on the channel for a connection at a time. connection.responseWriteLock.lock(); try { - for (int i = 0; i < 20; i++) { // protection if some handlers manage to need all the responder + for (int i = 0; i < 20; i++) { + // protection if some handlers manage to need all the responder Call call = connection.responseQueue.pollFirst(); if (call == null) { return true; @@ -1279,8 +1290,7 @@ public class RpcServer implements RpcServerInterface { secretManager, this)); break; default: - UserGroupInformation current = UserGroupInformation - .getCurrentUser(); + UserGroupInformation current = UserGroupInformation.getCurrentUser(); String fullName = current.getUserName(); if (LOG.isDebugEnabled()) { LOG.debug("Kerberos principal name is " + fullName);
