Repository: hbase
Updated Branches:
  refs/heads/master c67983ebf -> 5568929dd


HBASE-16654 Better handle channelInactive and close for netty rpc client


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/5568929d
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/5568929d
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/5568929d

Branch: refs/heads/master
Commit: 5568929dd2bfc20c1aa15d37d318459888cbd32a
Parents: c67983e
Author: zhangduo <zhang...@apache.org>
Authored: Mon Sep 19 20:52:46 2016 +0800
Committer: zhangduo <zhang...@apache.org>
Committed: Wed Sep 21 22:36:47 2016 +0800

----------------------------------------------------------------------
 .../hbase/ipc/BufferCallBeforeInitHandler.java  |   6 +-
 .../hadoop/hbase/ipc/NettyRpcConnection.java    |   6 +-
 .../hadoop/hbase/ipc/NettyRpcDuplexHandler.java |   9 +-
 .../hbase/security/AsyncHBaseSaslRpcClient.java |  58 --------
 .../AsyncHBaseSaslRpcClientHandler.java         | 135 ------------------
 .../hbase/security/NettyHBaseSaslRpcClient.java |  58 ++++++++
 .../NettyHBaseSaslRpcClientHandler.java         | 142 +++++++++++++++++++
 .../hbase/security/SaslUnwrapHandler.java       |   1 +
 .../hadoop/hbase/security/SaslWrapHandler.java  |  46 +++---
 9 files changed, 241 insertions(+), 220 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/5568929d/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BufferCallBeforeInitHandler.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BufferCallBeforeInitHandler.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BufferCallBeforeInitHandler.java
index 573ddd5..c628c31 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BufferCallBeforeInitHandler.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BufferCallBeforeInitHandler.java
@@ -62,13 +62,16 @@ class BufferCallBeforeInitHandler extends 
ChannelDuplexHandler {
   private static final BufferCallEvent SUCCESS_EVENT = new 
BufferCallEvent(BufferCallAction.FLUSH,
       null);
 
-  private final Map<Integer, Call> id2Call = new HashMap<Integer, Call>();
+  private final Map<Integer, Call> id2Call = new HashMap<>();
 
   @Override
   public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise 
promise) {
     if (msg instanceof Call) {
       Call call = (Call) msg;
       id2Call.put(call.id, call);
+      // The call is already in track so here we set the write operation as 
success.
+      // We will fail the call directly if we can not write it out.
+      promise.trySuccess();
     } else {
       ctx.write(msg, promise);
     }
@@ -99,5 +102,4 @@ class BufferCallBeforeInitHandler extends 
ChannelDuplexHandler {
       ctx.fireUserEventTriggered(evt);
     }
   }
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/5568929d/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
index 559b7f9..8a85580 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
@@ -53,7 +53,7 @@ import 
org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.ipc.BufferCallBeforeInitHandler.BufferCallEvent;
 import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
-import org.apache.hadoop.hbase.security.AsyncHBaseSaslRpcClientHandler;
+import org.apache.hadoop.hbase.security.NettyHBaseSaslRpcClientHandler;
 import org.apache.hadoop.hbase.security.SaslChallengeDecoder;
 import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
 import org.apache.hadoop.hbase.util.Threads;
@@ -190,7 +190,7 @@ class NettyRpcConnection extends RpcConnection {
     Promise<Boolean> saslPromise = ch.eventLoop().newPromise();
     ChannelHandler saslHandler;
     try {
-      saslHandler = new AsyncHBaseSaslRpcClientHandler(saslPromise, ticket, 
authMethod, token,
+      saslHandler = new NettyHBaseSaslRpcClientHandler(saslPromise, ticket, 
authMethod, token,
           serverPrincipal, rpcClient.fallbackAllowed, this.rpcClient.conf.get(
             "hbase.rpc.protection", 
QualityOfProtection.AUTHENTICATION.name().toLowerCase()));
     } catch (IOException e) {
@@ -205,7 +205,7 @@ class NettyRpcConnection extends RpcConnection {
         if (future.isSuccess()) {
           ChannelPipeline p = ch.pipeline();
           p.remove(SaslChallengeDecoder.class);
-          p.remove(AsyncHBaseSaslRpcClientHandler.class);
+          p.remove(NettyHBaseSaslRpcClientHandler.class);
           established(ch);
         } else {
           final Throwable error = future.cause();

http://git-wip-us.apache.org/repos/asf/hbase/blob/5568929d/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java
index 1cd89d8..5faaede 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java
@@ -204,13 +204,18 @@ class NettyRpcDuplexHandler extends ChannelDuplexHandler {
 
   @Override
   public void channelInactive(ChannelHandlerContext ctx) throws Exception {
-    cleanupCalls(ctx, new IOException("Connection closed"));
+    if (!id2Call.isEmpty()) {
+      cleanupCalls(ctx, new IOException("Connection closed"));
+    }
     conn.shutdown();
+    ctx.fireChannelInactive();
   }
 
   @Override
   public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
-    cleanupCalls(ctx, IPCUtil.toIOE(cause));
+    if (!id2Call.isEmpty()) {
+      cleanupCalls(ctx, IPCUtil.toIOE(cause));
+    }
     conn.shutdown();
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/5568929d/hbase-client/src/main/java/org/apache/hadoop/hbase/security/AsyncHBaseSaslRpcClient.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/AsyncHBaseSaslRpcClient.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/AsyncHBaseSaslRpcClient.java
deleted file mode 100644
index df703dc..0000000
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/AsyncHBaseSaslRpcClient.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.security;
-
-import io.netty.channel.ChannelPipeline;
-import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
-
-import java.io.IOException;
-
-import javax.security.sasl.Sasl;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-
-/**
- * Implement SASL logic for async rpc client.
- */
-@InterfaceAudience.Private
-public class AsyncHBaseSaslRpcClient extends AbstractHBaseSaslRpcClient {
-  private static final Log LOG = 
LogFactory.getLog(AsyncHBaseSaslRpcClient.class);
-
-  public AsyncHBaseSaslRpcClient(AuthMethod method, Token<? extends 
TokenIdentifier> token,
-      String serverPrincipal, boolean fallbackAllowed, String rpcProtection) 
throws IOException {
-    super(method, token, serverPrincipal, fallbackAllowed, rpcProtection);
-  }
-
-  public void setupSaslHandler(ChannelPipeline p) {
-    String qop = (String) saslClient.getNegotiatedProperty(Sasl.QOP);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("SASL client context established. Negotiated QoP: " + qop);
-    }
-    if (qop == null || "auth".equalsIgnoreCase(qop)) {
-      return;
-    }
-    // add wrap and unwrap handlers to pipeline.
-    p.addFirst(new SaslWrapHandler(saslClient),
-      new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4),
-      new SaslUnwrapHandler(saslClient));
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/5568929d/hbase-client/src/main/java/org/apache/hadoop/hbase/security/AsyncHBaseSaslRpcClientHandler.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/AsyncHBaseSaslRpcClientHandler.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/AsyncHBaseSaslRpcClientHandler.java
deleted file mode 100644
index bccfa30..0000000
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/AsyncHBaseSaslRpcClientHandler.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.security;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.util.concurrent.Promise;
-
-import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.ipc.FallbackDisallowedException;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-
-/**
- * Implement SASL logic for async rpc client.
- */
-@InterfaceAudience.Private
-public class AsyncHBaseSaslRpcClientHandler extends 
SimpleChannelInboundHandler<ByteBuf> {
-
-  private static final Log LOG = 
LogFactory.getLog(AsyncHBaseSaslRpcClientHandler.class);
-
-  private final Promise<Boolean> saslPromise;
-
-  private final UserGroupInformation ugi;
-
-  private final AsyncHBaseSaslRpcClient saslRpcClient;
-
-  /**
-   * @param saslPromise {@code true} if success, {@code false} if server tells 
us to fallback to
-   *          simple.
-   */
-  public AsyncHBaseSaslRpcClientHandler(Promise<Boolean> saslPromise, 
UserGroupInformation ugi,
-      AuthMethod method, Token<? extends TokenIdentifier> token, String 
serverPrincipal,
-      boolean fallbackAllowed, String rpcProtection) throws IOException {
-    this.saslPromise = saslPromise;
-    this.ugi = ugi;
-    this.saslRpcClient = new AsyncHBaseSaslRpcClient(method, token, 
serverPrincipal,
-        fallbackAllowed, rpcProtection);
-  }
-
-  private void writeResponse(ChannelHandlerContext ctx, byte[] response) {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Will send token of size " + response.length + " from 
initSASLContext.");
-    }
-    ctx.writeAndFlush(
-      ctx.alloc().buffer(4 + 
response.length).writeInt(response.length).writeBytes(response));
-  }
-
-  private void tryComplete(ChannelHandlerContext ctx) {
-    if (!saslRpcClient.isComplete()) {
-      return;
-    }
-    saslRpcClient.setupSaslHandler(ctx.pipeline());
-    saslPromise.setSuccess(true);
-  }
-
-  @Override
-  public void handlerAdded(ChannelHandlerContext ctx) {
-    try {
-      byte[] initialResponse = ugi.doAs(new 
PrivilegedExceptionAction<byte[]>() {
-
-        @Override
-        public byte[] run() throws Exception {
-          return saslRpcClient.getInitialResponse();
-        }
-      });
-      if (initialResponse != null) {
-        writeResponse(ctx, initialResponse);
-      }
-      tryComplete(ctx);
-    } catch (Exception e) {
-      // the exception thrown by handlerAdded will not be passed to the 
exceptionCaught below
-      // because netty will remove a handler if handlerAdded throws an 
exception.
-      exceptionCaught(ctx, e);
-    }
-  }
-
-  @Override
-  protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws 
Exception {
-    int len = msg.readInt();
-    if (len == SaslUtil.SWITCH_TO_SIMPLE_AUTH) {
-      saslRpcClient.dispose();
-      if (saslRpcClient.fallbackAllowed) {
-        saslPromise.setSuccess(false);
-      } else {
-        saslPromise.setFailure(new FallbackDisallowedException());
-      }
-      return;
-    }
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Will read input token of size " + len + " for processing by 
initSASLContext");
-    }
-    final byte[] challenge = new byte[len];
-    msg.readBytes(challenge);
-    byte[] response = ugi.doAs(new PrivilegedExceptionAction<byte[]>() {
-
-      @Override
-      public byte[] run() throws Exception {
-        return saslRpcClient.evaluateChallenge(challenge);
-      }
-    });
-    if (response != null) {
-      writeResponse(ctx, response);
-    }
-    tryComplete(ctx);
-  }
-
-  @Override
-  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
-    saslRpcClient.dispose();
-    saslPromise.setFailure(cause);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/5568929d/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClient.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClient.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClient.java
new file mode 100644
index 0000000..f624608
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClient.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security;
+
+import io.netty.channel.ChannelPipeline;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+
+import java.io.IOException;
+
+import javax.security.sasl.Sasl;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+
+/**
+ * Implement SASL logic for netty rpc client.
+ */
+@InterfaceAudience.Private
+public class NettyHBaseSaslRpcClient extends AbstractHBaseSaslRpcClient {
+  private static final Log LOG = 
LogFactory.getLog(NettyHBaseSaslRpcClient.class);
+
+  public NettyHBaseSaslRpcClient(AuthMethod method, Token<? extends 
TokenIdentifier> token,
+      String serverPrincipal, boolean fallbackAllowed, String rpcProtection) 
throws IOException {
+    super(method, token, serverPrincipal, fallbackAllowed, rpcProtection);
+  }
+
+  public void setupSaslHandler(ChannelPipeline p) {
+    String qop = (String) saslClient.getNegotiatedProperty(Sasl.QOP);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("SASL client context established. Negotiated QoP: " + qop);
+    }
+    if (qop == null || "auth".equalsIgnoreCase(qop)) {
+      return;
+    }
+    // add wrap and unwrap handlers to pipeline.
+    p.addFirst(new SaslWrapHandler(saslClient),
+      new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4),
+      new SaslUnwrapHandler(saslClient));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/5568929d/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClientHandler.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClientHandler.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClientHandler.java
new file mode 100644
index 0000000..50609b4
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClientHandler.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.util.concurrent.Promise;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.ipc.FallbackDisallowedException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+
+/**
+ * Implement SASL logic for netty rpc client.
+ */
+@InterfaceAudience.Private
+public class NettyHBaseSaslRpcClientHandler extends 
SimpleChannelInboundHandler<ByteBuf> {
+
+  private static final Log LOG = 
LogFactory.getLog(NettyHBaseSaslRpcClientHandler.class);
+
+  private final Promise<Boolean> saslPromise;
+
+  private final UserGroupInformation ugi;
+
+  private final NettyHBaseSaslRpcClient saslRpcClient;
+
+  /**
+   * @param saslPromise {@code true} if success, {@code false} if server tells 
us to fallback to
+   *          simple.
+   */
+  public NettyHBaseSaslRpcClientHandler(Promise<Boolean> saslPromise, 
UserGroupInformation ugi,
+      AuthMethod method, Token<? extends TokenIdentifier> token, String 
serverPrincipal,
+      boolean fallbackAllowed, String rpcProtection) throws IOException {
+    this.saslPromise = saslPromise;
+    this.ugi = ugi;
+    this.saslRpcClient = new NettyHBaseSaslRpcClient(method, token, 
serverPrincipal,
+        fallbackAllowed, rpcProtection);
+  }
+
+  private void writeResponse(ChannelHandlerContext ctx, byte[] response) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Will send token of size " + response.length + " from 
initSASLContext.");
+    }
+    ctx.writeAndFlush(
+      ctx.alloc().buffer(4 + 
response.length).writeInt(response.length).writeBytes(response));
+  }
+
+  private void tryComplete(ChannelHandlerContext ctx) {
+    if (!saslRpcClient.isComplete()) {
+      return;
+    }
+    saslRpcClient.setupSaslHandler(ctx.pipeline());
+    saslPromise.setSuccess(true);
+  }
+
+  @Override
+  public void handlerAdded(ChannelHandlerContext ctx) {
+    try {
+      byte[] initialResponse = ugi.doAs(new 
PrivilegedExceptionAction<byte[]>() {
+
+        @Override
+        public byte[] run() throws Exception {
+          return saslRpcClient.getInitialResponse();
+        }
+      });
+      if (initialResponse != null) {
+        writeResponse(ctx, initialResponse);
+      }
+      tryComplete(ctx);
+    } catch (Exception e) {
+      // the exception thrown by handlerAdded will not be passed to the 
exceptionCaught below
+      // because netty will remove a handler if handlerAdded throws an 
exception.
+      exceptionCaught(ctx, e);
+    }
+  }
+
+  @Override
+  protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws 
Exception {
+    int len = msg.readInt();
+    if (len == SaslUtil.SWITCH_TO_SIMPLE_AUTH) {
+      saslRpcClient.dispose();
+      if (saslRpcClient.fallbackAllowed) {
+        saslPromise.trySuccess(false);
+      } else {
+        saslPromise.tryFailure(new FallbackDisallowedException());
+      }
+      return;
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Will read input token of size " + len + " for processing by 
initSASLContext");
+    }
+    final byte[] challenge = new byte[len];
+    msg.readBytes(challenge);
+    byte[] response = ugi.doAs(new PrivilegedExceptionAction<byte[]>() {
+
+      @Override
+      public byte[] run() throws Exception {
+        return saslRpcClient.evaluateChallenge(challenge);
+      }
+    });
+    if (response != null) {
+      writeResponse(ctx, response);
+    }
+    tryComplete(ctx);
+  }
+
+  @Override
+  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+    saslRpcClient.dispose();
+    saslPromise.tryFailure(new IOException("Connection closed"));
+    ctx.fireChannelInactive();
+  }
+
+  @Override
+  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+    saslRpcClient.dispose();
+    saslPromise.tryFailure(cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/5568929d/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUnwrapHandler.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUnwrapHandler.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUnwrapHandler.java
index c2faf91..e631478 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUnwrapHandler.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUnwrapHandler.java
@@ -42,6 +42,7 @@ public class SaslUnwrapHandler extends 
SimpleChannelInboundHandler<ByteBuf> {
   @Override
   public void channelInactive(ChannelHandlerContext ctx) throws Exception {
     SaslUtil.safeDispose(saslClient);
+    ctx.fireChannelInactive();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/5568929d/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslWrapHandler.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslWrapHandler.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslWrapHandler.java
index fefb4f8..14ecf2e 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslWrapHandler.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslWrapHandler.java
@@ -26,6 +26,8 @@ import io.netty.channel.CoalescingBufferQueue;
 import io.netty.util.ReferenceCountUtil;
 import io.netty.util.concurrent.PromiseCombiner;
 
+import java.io.IOException;
+
 import javax.security.sasl.SaslClient;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
@@ -40,6 +42,10 @@ public class SaslWrapHandler extends 
ChannelOutboundHandlerAdapter {
 
   private CoalescingBufferQueue queue;
 
+  public SaslWrapHandler(SaslClient saslClient) {
+    this.saslClient = saslClient;
+  }
+
   @Override
   public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
     queue = new CoalescingBufferQueue(ctx.channel());
@@ -55,29 +61,26 @@ public class SaslWrapHandler extends 
ChannelOutboundHandlerAdapter {
     }
   }
 
-  public SaslWrapHandler(SaslClient saslClient) {
-    this.saslClient = saslClient;
-  }
-
   @Override
   public void flush(ChannelHandlerContext ctx) throws Exception {
+    if (queue.isEmpty()) {
+      return;
+    }
     ByteBuf buf = null;
     try {
-      if (!queue.isEmpty()) {
-        ChannelPromise promise = ctx.newPromise();
-        int readableBytes = queue.readableBytes();
-        buf = queue.remove(readableBytes, promise);
-        byte[] bytes = new byte[readableBytes];
-        buf.readBytes(bytes);
-        byte[] wrapperBytes = saslClient.wrap(bytes, 0, bytes.length);
-        ChannelPromise lenPromise = ctx.newPromise();
-        ctx.write(ctx.alloc().buffer(4).writeInt(wrapperBytes.length), 
lenPromise);
-        ChannelPromise contentPromise = ctx.newPromise();
-        ctx.write(Unpooled.wrappedBuffer(wrapperBytes), contentPromise);
-        PromiseCombiner combiner = new PromiseCombiner();
-        combiner.addAll(lenPromise, contentPromise);
-        combiner.finish(promise);
-      }
+      ChannelPromise promise = ctx.newPromise();
+      int readableBytes = queue.readableBytes();
+      buf = queue.remove(readableBytes, promise);
+      byte[] bytes = new byte[readableBytes];
+      buf.readBytes(bytes);
+      byte[] wrapperBytes = saslClient.wrap(bytes, 0, bytes.length);
+      ChannelPromise lenPromise = ctx.newPromise();
+      ctx.write(ctx.alloc().buffer(4).writeInt(wrapperBytes.length), 
lenPromise);
+      ChannelPromise contentPromise = ctx.newPromise();
+      ctx.write(Unpooled.wrappedBuffer(wrapperBytes), contentPromise);
+      PromiseCombiner combiner = new PromiseCombiner();
+      combiner.addAll(lenPromise, contentPromise);
+      combiner.finish(promise);
       ctx.flush();
     } finally {
       if (buf != null) {
@@ -88,6 +91,9 @@ public class SaslWrapHandler extends 
ChannelOutboundHandlerAdapter {
 
   @Override
   public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws 
Exception {
-    queue.releaseAndFailAll(new Throwable("Closed"));
+    if (!queue.isEmpty()) {
+      queue.releaseAndFailAll(new IOException("Connection closed"));
+    }
+    ctx.close(promise);
   }
 }

Reply via email to