Repository: hbase
Updated Branches:
  refs/heads/master f20fac41d -> 9f4b6ac06


HBASE-11835 Wrong managenement of non expected calls in the client (Nicolas 
Liochon)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/9f4b6ac0
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/9f4b6ac0
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/9f4b6ac0

Branch: refs/heads/master
Commit: 9f4b6ac06c35fb18acd1951382da024780e01e2f
Parents: f20fac4
Author: stack <[email protected]>
Authored: Thu Oct 30 12:41:54 2014 -0700
Committer: stack <[email protected]>
Committed: Thu Oct 30 12:41:54 2014 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/ipc/RpcClient.java  | 20 +++++-----
 .../org/apache/hadoop/hbase/ipc/RpcServer.java  | 42 ++++++++++++--------
 2 files changed, 37 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/9f4b6ac0/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
index 225a2c9..4586e3e 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
@@ -338,7 +338,8 @@ public class RpcClient {
 
   protected final static Map<AuthenticationProtos.TokenIdentifier.Kind,
       TokenSelector<? extends TokenIdentifier>> tokenHandlers =
-      new HashMap<AuthenticationProtos.TokenIdentifier.Kind, TokenSelector<? 
extends TokenIdentifier>>();
+      new HashMap<AuthenticationProtos.TokenIdentifier.Kind,
+        TokenSelector<? extends TokenIdentifier>>();
   static {
     
tokenHandlers.put(AuthenticationProtos.TokenIdentifier.Kind.HBASE_AUTH_TOKEN,
         new AuthenticationTokenSelector());
@@ -652,18 +653,21 @@ public class RpcClient {
           socket.getOutputStream().close();
         }
       } catch (IOException ignored) {  // Can happen if the socket is already 
closed
+        if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
       }
       try {
         if (socket.getInputStream() != null) {
           socket.getInputStream().close();
         }
       } catch (IOException ignored) {  // Can happen if the socket is already 
closed
+        if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
       }
       try {
         if (socket.getChannel() != null) {
           socket.getChannel().close();
         }
       } catch (IOException ignored) {  // Can happen if the socket is already 
closed
+        if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
       }
       try {
         socket.close();
@@ -1161,18 +1165,18 @@ public class RpcClient {
           int readSoFar = 
IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader);
           int whatIsLeftToRead = totalSize - readSoFar;
           IOUtils.skipFully(in, whatIsLeftToRead);
+          return;
         }
         if (responseHeader.hasException()) {
           ExceptionResponse exceptionResponse = responseHeader.getException();
           RemoteException re = createRemoteException(exceptionResponse);
-          if (expectedCall) call.setException(re);
+          call.setException(re);
           if (isFatalConnectionException(exceptionResponse)) {
             markClosed(re);
           }
         } else {
           Message value = null;
-          // Call may be null because it may have timeout and been cleaned up 
on this side already
-          if (expectedCall && call.responseDefaultType != null) {
+          if (call.responseDefaultType != null) {
             Builder builder = call.responseDefaultType.newBuilderForType();
             builder.mergeDelimitedFrom(in);
             value = builder.build();
@@ -1184,9 +1188,7 @@ public class RpcClient {
             IOUtils.readFully(this.in, cellBlock, 0, cellBlock.length);
             cellBlockScanner = ipcUtil.createCellScanner(this.codec, 
this.compressor, cellBlock);
           }
-          // it's possible that this call may have been cleaned up due to a RPC
-          // timeout, so check if it still exists before setting the value.
-          if (expectedCall) call.setResponse(value, cellBlockScanner);
+          call.setResponse(value, cellBlockScanner);
         }
       } catch (IOException e) {
         if (expectedCall) call.setException(e);
@@ -1194,6 +1196,7 @@ public class RpcClient {
           // Clean up open calls but don't treat this as a fatal condition,
           // since we expect certain responses to not make it by the specified
           // {@link ConnectionId#rpcTimeout}.
+          if (LOG.isTraceEnabled()) LOG.trace("ignored", e);
         } else {
           // Treat this as a fatal condition and close this connection
           markClosed(e);
@@ -1489,7 +1492,6 @@ public class RpcClient {
           @Override
           public void run(Object parameter) {
             connection.callSender.remove(cts);
-            call.callComplete();
           }
         });
         if (pcrc.isCanceled()) {
@@ -1662,7 +1664,7 @@ public class RpcClient {
     public int hashCode() {
       int hashcode = (address.hashCode() +
         PRIME * (PRIME * this.serviceName.hashCode() ^
-        (ticket == null ? 0 : ticket.hashCode()) ));
+        (ticket == null ? 0 : ticket.hashCode())));
       return hashcode;
     }
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/9f4b6ac0/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index 1799d80..4c72714 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -685,6 +685,7 @@ public class RpcServer implements RpcServerInterface {
                   doAccept(key);
               }
             } catch (IOException ignored) {
+              if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
             }
             key = null;
           }
@@ -722,7 +723,9 @@ public class RpcServer implements RpcServerInterface {
         try {
           acceptChannel.close();
           selector.close();
-        } catch (IOException ignored) { }
+        } catch (IOException ignored) {
+          if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
+        }
 
         selector= null;
         acceptChannel= null;
@@ -870,22 +873,28 @@ public class RpcServer implements RpcServerInterface {
 
     /**
      * Take the list of the connections that want to write, and register them
-     *  in the selector.
+     * in the selector.
      */
-    private void registerWrites(){
+    private void registerWrites() {
       Iterator<Connection> it = writingCons.iterator();
-      while (it.hasNext()){
+      while (it.hasNext()) {
         Connection c = it.next();
         it.remove();
         SelectionKey sk = c.channel.keyFor(writeSelector);
-        if (sk == null){
-          try {
-            c.channel.register(writeSelector, SelectionKey.OP_WRITE, c);
-          } catch (ClosedChannelException e) {
-            // ignore: the client went away.
+        try {
+          if (sk == null) {
+            try {
+              c.channel.register(writeSelector, SelectionKey.OP_WRITE, c);
+            } catch (ClosedChannelException e) {
+              // ignore: the client went away.
+              if (LOG.isTraceEnabled()) LOG.trace("ignored", e);
+            }
+          } else {
+            sk.interestOps(SelectionKey.OP_WRITE);
           }
-        } else {
-          sk.interestOps(SelectionKey.OP_WRITE);
+        } catch (CancelledKeyException e) {
+          // ignore: the client went away.
+          if (LOG.isTraceEnabled()) LOG.trace("ignored", e);
         }
       }
     }
@@ -1048,15 +1057,17 @@ public class RpcServer implements RpcServerInterface {
     /**
      * Process all the responses for this connection
      *
-     * @return true if all the calls were processed or that someone else is 
doing it. false if there
-     * is still some work to do. In this case, we expect the caller to delay 
us.
+     * @return true if all the calls were processed or that someone else is 
doing it.
+     * false if there * is still some work to do. In this case, we expect the 
caller to
+     * delay us.
      * @throws IOException
      */
     private boolean processAllResponses(final Connection connection) throws 
IOException {
       // We want only one writer on the channel for a connection at a time.
       connection.responseWriteLock.lock();
       try {
-        for (int i = 0; i < 20; i++) { // protection if some handlers manage 
to need all the responder
+        for (int i = 0; i < 20; i++) {
+          // protection if some handlers manage to need all the responder
           Call call = connection.responseQueue.pollFirst();
           if (call == null) {
             return true;
@@ -1279,8 +1290,7 @@ public class RpcServer implements RpcServerInterface {
                       secretManager, this));
               break;
             default:
-              UserGroupInformation current = UserGroupInformation
-              .getCurrentUser();
+              UserGroupInformation current = 
UserGroupInformation.getCurrentUser();
               String fullName = current.getUserName();
               if (LOG.isDebugEnabled()) {
                 LOG.debug("Kerberos principal name is " + fullName);

Reply via email to