http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcClient.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcClient.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcClient.java
index bb6763f..3d88115 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcClient.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcClient.java
@@ -18,6 +18,17 @@
 
 package org.apache.hadoop.hbase.security;
 
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslException;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
@@ -28,123 +39,23 @@ import org.apache.hadoop.security.SaslOutputStream;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 
-import javax.security.auth.callback.Callback;
-import javax.security.auth.callback.CallbackHandler;
-import javax.security.auth.callback.NameCallback;
-import javax.security.auth.callback.PasswordCallback;
-import javax.security.auth.callback.UnsupportedCallbackException;
-import javax.security.sasl.RealmCallback;
-import javax.security.sasl.RealmChoiceCallback;
-import javax.security.sasl.Sasl;
-import javax.security.sasl.SaslClient;
-import javax.security.sasl.SaslException;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-import com.google.common.annotations.VisibleForTesting;
-
 /**
- * A utility class that encapsulates SASL logic for RPC client.
- * Copied from <code>org.apache.hadoop.security</code>
+ * A utility class that encapsulates SASL logic for RPC client. Copied from
+ * <code>org.apache.hadoop.security</code>
  */
 @InterfaceAudience.Private
-public class HBaseSaslRpcClient {
-  private static final Log LOG = LogFactory.getLog(HBaseSaslRpcClient.class);
+public class HBaseSaslRpcClient extends AbstractHBaseSaslRpcClient {
 
-  private final SaslClient saslClient;
-  private final boolean fallbackAllowed;
-  /**
-   * Create a HBaseSaslRpcClient for an authentication method
-   *
-   * @param method
-   *          the requested authentication method
-   * @param token
-   *          token to use if needed by the authentication method
-   * @param serverPrincipal
-   *          the server principal that we are trying to set the connection up 
to
-   * @param fallbackAllowed
-   *          does the client allow fallback to simple authentication
-   * @throws IOException
-   */
-  public HBaseSaslRpcClient(AuthMethod method,
-      Token<? extends TokenIdentifier> token, String serverPrincipal, boolean 
fallbackAllowed)
-      throws IOException {
-    this(method, token, serverPrincipal, fallbackAllowed, "authentication");
-  }
-  /**
-   * Create a HBaseSaslRpcClient for an authentication method
-   *
-   * @param method
-   *          the requested authentication method
-   * @param token
-   *          token to use if needed by the authentication method
-   * @param serverPrincipal
-   *          the server principal that we are trying to set the connection up 
to
-   * @param fallbackAllowed
-   *          does the client allow fallback to simple authentication
-   * @param rpcProtection
-   *          the protection level ("authentication", "integrity" or "privacy")
-   * @throws IOException
-   */
-  public HBaseSaslRpcClient(AuthMethod method,
-      Token<? extends TokenIdentifier> token, String serverPrincipal, boolean 
fallbackAllowed,
-      String rpcProtection) throws IOException {
-    this.fallbackAllowed = fallbackAllowed;
-    SaslUtil.initSaslProperties(rpcProtection);
-    switch (method) {
-    case DIGEST:
-      if (LOG.isDebugEnabled())
-        LOG.debug("Creating SASL " + AuthMethod.DIGEST.getMechanismName()
-            + " client to authenticate to service at " + token.getService());
-      saslClient = createDigestSaslClient(
-          new String[] { AuthMethod.DIGEST.getMechanismName() },
-          SaslUtil.SASL_DEFAULT_REALM, new SaslClientCallbackHandler(token));
-      break;
-    case KERBEROS:
-      if (LOG.isDebugEnabled()) {
-        LOG
-            .debug("Creating SASL " + AuthMethod.KERBEROS.getMechanismName()
-                + " client. Server's Kerberos principal name is "
-                + serverPrincipal);
-      }
-      if (serverPrincipal == null || serverPrincipal.length() == 0) {
-        throw new IOException(
-            "Failed to specify server's Kerberos principal name");
-      }
-      String[] names = SaslUtil.splitKerberosName(serverPrincipal);
-      if (names.length != 3) {
-        throw new IOException(
-          "Kerberos principal does not have the expected format: "
-                + serverPrincipal);
-      }
-      saslClient = createKerberosSaslClient(
-          new String[] { AuthMethod.KERBEROS.getMechanismName() },
-          names[0], names[1]);
-      break;
-    default:
-      throw new IOException("Unknown authentication method " + method);
-    }
-    if (saslClient == null)
-      throw new IOException("Unable to find SASL client implementation");
-  }
+  private static final Log LOG = LogFactory.getLog(HBaseSaslRpcClient.class);
 
-  protected SaslClient createDigestSaslClient(String[] mechanismNames,
-      String saslDefaultRealm, CallbackHandler saslClientCallbackHandler)
-      throws IOException {
-    return Sasl.createSaslClient(mechanismNames, null, null, saslDefaultRealm,
-        SaslUtil.SASL_PROPS, saslClientCallbackHandler);
+  public HBaseSaslRpcClient(AuthMethod method, Token<? extends 
TokenIdentifier> token,
+      String serverPrincipal, boolean fallbackAllowed) throws IOException {
+    super(method, token, serverPrincipal, fallbackAllowed);
   }
 
-  protected SaslClient createKerberosSaslClient(String[] mechanismNames,
-      String userFirstPart, String userSecondPart) throws IOException {
-    return Sasl.createSaslClient(mechanismNames, null, userFirstPart,
-        userSecondPart, SaslUtil.SASL_PROPS, null);
+  public HBaseSaslRpcClient(AuthMethod method, Token<? extends 
TokenIdentifier> token,
+      String serverPrincipal, boolean fallbackAllowed, String rpcProtection) 
throws IOException {
+    super(method, token, serverPrincipal, fallbackAllowed, rpcProtection);
   }
 
   private static void readStatus(DataInputStream inStream) throws IOException {
@@ -156,72 +67,65 @@ public class HBaseSaslRpcClient {
   }
 
   /**
-   * Do client side SASL authentication with server via the given InputStream
-   * and OutputStream
-   *
-   * @param inS
-   *          InputStream to use
-   * @param outS
-   *          OutputStream to use
-   * @return true if connection is set up, or false if needs to switch
-   *             to simple Auth.
+   * Do client side SASL authentication with server via the given InputStream 
and OutputStream
+   * @param inS InputStream to use
+   * @param outS OutputStream to use
+   * @return true if connection is set up, or false if needs to switch to 
simple Auth.
    * @throws IOException
    */
-  public boolean saslConnect(InputStream inS, OutputStream outS)
-      throws IOException {
+  public boolean saslConnect(InputStream inS, OutputStream outS) throws 
IOException {
     DataInputStream inStream = new DataInputStream(new 
BufferedInputStream(inS));
-    DataOutputStream outStream = new DataOutputStream(new BufferedOutputStream(
-        outS));
+    DataOutputStream outStream = new DataOutputStream(new 
BufferedOutputStream(outS));
 
     try {
-      byte[] saslToken = new byte[0];
-      if (saslClient.hasInitialResponse())
-        saslToken = saslClient.evaluateChallenge(saslToken);
+      byte[] saslToken = getInitialResponse();
       if (saslToken != null) {
         outStream.writeInt(saslToken.length);
         outStream.write(saslToken, 0, saslToken.length);
         outStream.flush();
-        if (LOG.isDebugEnabled())
-          LOG.debug("Have sent token of size " + saslToken.length
-              + " from initSASLContext.");
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Have sent token of size " + saslToken.length + " from 
initSASLContext.");
+        }
       }
-      if (!saslClient.isComplete()) {
+      if (!isComplete()) {
         readStatus(inStream);
         int len = inStream.readInt();
         if (len == SaslUtil.SWITCH_TO_SIMPLE_AUTH) {
           if (!fallbackAllowed) {
-            throw new IOException("Server asks us to fall back to SIMPLE auth, 
" +
-                "but this client is configured to only allow secure 
connections.");
+            throw new IOException("Server asks us to fall back to SIMPLE auth, 
"
+                + "but this client is configured to only allow secure 
connections.");
           }
           if (LOG.isDebugEnabled()) {
             LOG.debug("Server asks us to fall back to simple auth.");
           }
-          saslClient.dispose();
+          dispose();
           return false;
         }
         saslToken = new byte[len];
-        if (LOG.isDebugEnabled())
+        if (LOG.isDebugEnabled()) {
           LOG.debug("Will read input token of size " + saslToken.length
               + " for processing by initSASLContext");
+        }
         inStream.readFully(saslToken);
       }
 
-      while (!saslClient.isComplete()) {
-        saslToken = saslClient.evaluateChallenge(saslToken);
+      while (!isComplete()) {
+        saslToken = evaluateChallenge(saslToken);
         if (saslToken != null) {
-          if (LOG.isDebugEnabled())
-            LOG.debug("Will send token of size " + saslToken.length
-                + " from initSASLContext.");
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Will send token of size " + saslToken.length + " from 
initSASLContext.");
+          }
           outStream.writeInt(saslToken.length);
           outStream.write(saslToken, 0, saslToken.length);
           outStream.flush();
         }
-        if (!saslClient.isComplete()) {
+        if (!isComplete()) {
           readStatus(inStream);
           saslToken = new byte[inStream.readInt()];
-          if (LOG.isDebugEnabled())
+          if (LOG.isDebugEnabled()) {
             LOG.debug("Will read input token of size " + saslToken.length
                 + " for processing by initSASLContext");
+          }
           inStream.readFully(saslToken);
         }
       }
@@ -241,11 +145,8 @@ public class HBaseSaslRpcClient {
   }
 
   /**
-   * Get a SASL wrapped InputStream. Can be called only after saslConnect() has
-   * been called.
-   *
-   * @param in
-   *          the InputStream to wrap
+   * Get a SASL wrapped InputStream. Can be called only after saslConnect() 
has been called.
+   * @param in the InputStream to wrap
    * @return a SASL wrapped InputStream
    * @throws IOException
    */
@@ -257,11 +158,8 @@ public class HBaseSaslRpcClient {
   }
 
   /**
-   * Get a SASL wrapped OutputStream. Can be called only after saslConnect() 
has
-   * been called.
-   *
-   * @param out
-   *          the OutputStream to wrap
+   * Get a SASL wrapped OutputStream. Can be called only after saslConnect() 
has been called.
+   * @param out the OutputStream to wrap
    * @return a SASL wrapped OutputStream
    * @throws IOException
    */
@@ -271,58 +169,4 @@ public class HBaseSaslRpcClient {
     }
     return new SaslOutputStream(out, saslClient);
   }
-
-  /** Release resources used by wrapped saslClient */
-  public void dispose() throws SaslException {
-    saslClient.dispose();
-  }
-
-  @VisibleForTesting
-  static class SaslClientCallbackHandler implements CallbackHandler {
-    private final String userName;
-    private final char[] userPassword;
-
-    public SaslClientCallbackHandler(Token<? extends TokenIdentifier> token) {
-      this.userName = SaslUtil.encodeIdentifier(token.getIdentifier());
-      this.userPassword = SaslUtil.encodePassword(token.getPassword());
-    }
-
-    @Override
-    public void handle(Callback[] callbacks)
-        throws UnsupportedCallbackException {
-      NameCallback nc = null;
-      PasswordCallback pc = null;
-      RealmCallback rc = null;
-      for (Callback callback : callbacks) {
-        if (callback instanceof RealmChoiceCallback) {
-          continue;
-        } else if (callback instanceof NameCallback) {
-          nc = (NameCallback) callback;
-        } else if (callback instanceof PasswordCallback) {
-          pc = (PasswordCallback) callback;
-        } else if (callback instanceof RealmCallback) {
-          rc = (RealmCallback) callback;
-        } else {
-          throw new UnsupportedCallbackException(callback,
-              "Unrecognized SASL client callback");
-        }
-      }
-      if (nc != null) {
-        if (LOG.isDebugEnabled())
-          LOG.debug("SASL client callback: setting username: " + userName);
-        nc.setName(userName);
-      }
-      if (pc != null) {
-        if (LOG.isDebugEnabled())
-          LOG.debug("SASL client callback: setting userPassword");
-        pc.setPassword(userPassword);
-      }
-      if (rc != null) {
-        if (LOG.isDebugEnabled())
-          LOG.debug("SASL client callback: setting realm: "
-              + rc.getDefaultText());
-        rc.setText(rc.getDefaultText());
-      }
-    }
-  }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClient.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClient.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClient.java
new file mode 100644
index 0000000..f624608
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClient.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security;
+
+import io.netty.channel.ChannelPipeline;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+
+import java.io.IOException;
+
+import javax.security.sasl.Sasl;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+
+/**
+ * Implement SASL logic for netty rpc client.
+ */
[email protected]
+public class NettyHBaseSaslRpcClient extends AbstractHBaseSaslRpcClient {
+  private static final Log LOG = 
LogFactory.getLog(NettyHBaseSaslRpcClient.class);
+
+  public NettyHBaseSaslRpcClient(AuthMethod method, Token<? extends 
TokenIdentifier> token,
+      String serverPrincipal, boolean fallbackAllowed, String rpcProtection) 
throws IOException {
+    super(method, token, serverPrincipal, fallbackAllowed, rpcProtection);
+  }
+
+  public void setupSaslHandler(ChannelPipeline p) {
+    String qop = (String) saslClient.getNegotiatedProperty(Sasl.QOP);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("SASL client context established. Negotiated QoP: " + qop);
+    }
+    if (qop == null || "auth".equalsIgnoreCase(qop)) {
+      return;
+    }
+    // add wrap and unwrap handlers to pipeline.
+    p.addFirst(new SaslWrapHandler(saslClient),
+      new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4),
+      new SaslUnwrapHandler(saslClient));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClientHandler.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClientHandler.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClientHandler.java
new file mode 100644
index 0000000..50609b4
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClientHandler.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.util.concurrent.Promise;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.ipc.FallbackDisallowedException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+
+/**
+ * Implement SASL logic for netty rpc client.
+ */
[email protected]
+public class NettyHBaseSaslRpcClientHandler extends 
SimpleChannelInboundHandler<ByteBuf> {
+
+  private static final Log LOG = 
LogFactory.getLog(NettyHBaseSaslRpcClientHandler.class);
+
+  private final Promise<Boolean> saslPromise;
+
+  private final UserGroupInformation ugi;
+
+  private final NettyHBaseSaslRpcClient saslRpcClient;
+
+  /**
+   * @param saslPromise {@code true} if success, {@code false} if server tells 
us to fallback to
+   *          simple.
+   */
+  public NettyHBaseSaslRpcClientHandler(Promise<Boolean> saslPromise, 
UserGroupInformation ugi,
+      AuthMethod method, Token<? extends TokenIdentifier> token, String 
serverPrincipal,
+      boolean fallbackAllowed, String rpcProtection) throws IOException {
+    this.saslPromise = saslPromise;
+    this.ugi = ugi;
+    this.saslRpcClient = new NettyHBaseSaslRpcClient(method, token, 
serverPrincipal,
+        fallbackAllowed, rpcProtection);
+  }
+
+  private void writeResponse(ChannelHandlerContext ctx, byte[] response) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Will send token of size " + response.length + " from 
initSASLContext.");
+    }
+    ctx.writeAndFlush(
+      ctx.alloc().buffer(4 + 
response.length).writeInt(response.length).writeBytes(response));
+  }
+
+  private void tryComplete(ChannelHandlerContext ctx) {
+    if (!saslRpcClient.isComplete()) {
+      return;
+    }
+    saslRpcClient.setupSaslHandler(ctx.pipeline());
+    saslPromise.setSuccess(true);
+  }
+
+  @Override
+  public void handlerAdded(ChannelHandlerContext ctx) {
+    try {
+      byte[] initialResponse = ugi.doAs(new 
PrivilegedExceptionAction<byte[]>() {
+
+        @Override
+        public byte[] run() throws Exception {
+          return saslRpcClient.getInitialResponse();
+        }
+      });
+      if (initialResponse != null) {
+        writeResponse(ctx, initialResponse);
+      }
+      tryComplete(ctx);
+    } catch (Exception e) {
+      // the exception thrown by handlerAdded will not be passed to the 
exceptionCaught below
+      // because netty will remove a handler if handlerAdded throws an 
exception.
+      exceptionCaught(ctx, e);
+    }
+  }
+
+  @Override
+  protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws 
Exception {
+    int len = msg.readInt();
+    if (len == SaslUtil.SWITCH_TO_SIMPLE_AUTH) {
+      saslRpcClient.dispose();
+      if (saslRpcClient.fallbackAllowed) {
+        saslPromise.trySuccess(false);
+      } else {
+        saslPromise.tryFailure(new FallbackDisallowedException());
+      }
+      return;
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Will read input token of size " + len + " for processing by 
initSASLContext");
+    }
+    final byte[] challenge = new byte[len];
+    msg.readBytes(challenge);
+    byte[] response = ugi.doAs(new PrivilegedExceptionAction<byte[]>() {
+
+      @Override
+      public byte[] run() throws Exception {
+        return saslRpcClient.evaluateChallenge(challenge);
+      }
+    });
+    if (response != null) {
+      writeResponse(ctx, response);
+    }
+    tryComplete(ctx);
+  }
+
+  @Override
+  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+    saslRpcClient.dispose();
+    saslPromise.tryFailure(new IOException("Connection closed"));
+    ctx.fireChannelInactive();
+  }
+
+  @Override
+  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+    saslRpcClient.dispose();
+    saslPromise.tryFailure(cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslChallengeDecoder.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslChallengeDecoder.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslChallengeDecoder.java
new file mode 100644
index 0000000..57bb36c
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslChallengeDecoder.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.ipc.RemoteException;
+
+/**
+ * Decode the sasl challenge sent by RpcServer.
+ */
[email protected]
+public class SaslChallengeDecoder extends ByteToMessageDecoder {
+
+  private static final int MAX_CHALLENGE_SIZE = 1024 * 1024; // 1M
+
+  private ByteBuf tryDecodeChallenge(ByteBuf in, int offset, int 
readableBytes) throws IOException {
+    if (readableBytes < 4) {
+      return null;
+    }
+    int len = in.getInt(offset);
+    if (len <= 0) {
+      // fall back to simple
+      in.readerIndex(offset + 4);
+      return in.retainedSlice(offset, 4);
+    }
+    if (len > MAX_CHALLENGE_SIZE) {
+      throw new IOException(
+          "Sasl challenge too large(" + len + "), max allowed is " + 
MAX_CHALLENGE_SIZE);
+    }
+    int totalLen = 4 + len;
+    if (readableBytes < totalLen) {
+      return null;
+    }
+    in.readerIndex(offset + totalLen);
+    return in.retainedSlice(offset, totalLen);
+  }
+
+  // will throw a RemoteException out if data is enough, so do not need to 
return anything.
+  private void tryDecodeError(ByteBuf in, int offset, int readableBytes) 
throws IOException {
+    if (readableBytes < 4) {
+      return;
+    }
+    int classLen = in.getInt(offset);
+    if (classLen <= 0) {
+      throw new IOException("Invalid exception class name length " + classLen);
+    }
+    if (classLen > MAX_CHALLENGE_SIZE) {
+      throw new IOException("Exception class name length too large(" + 
classLen +
+          "), max allowed is " + MAX_CHALLENGE_SIZE);
+    }
+    if (readableBytes < 4 + classLen + 4) {
+      return;
+    }
+    int msgLen = in.getInt(offset + 4 + classLen);
+    if (msgLen <= 0) {
+      throw new IOException("Invalid exception message length " + msgLen);
+    }
+    if (msgLen > MAX_CHALLENGE_SIZE) {
+      throw new IOException("Exception message length too large(" + msgLen + 
"), max allowed is " +
+          MAX_CHALLENGE_SIZE);
+    }
+    int totalLen = classLen + msgLen + 8;
+    if (readableBytes < totalLen) {
+      return;
+    }
+    String className = in.toString(offset + 4, classLen, 
HConstants.UTF8_CHARSET);
+    String msg = in.toString(offset + classLen + 8, msgLen, 
HConstants.UTF8_CHARSET);
+    in.readerIndex(offset + totalLen);
+    throw new RemoteException(className, msg);
+  }
+
+  @Override
+  protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> 
out) throws Exception {
+    int readableBytes = in.readableBytes();
+    if (readableBytes < 4) {
+      return;
+    }
+    int offset = in.readerIndex();
+    int status = in.getInt(offset);
+    if (status == SaslStatus.SUCCESS.state) {
+      ByteBuf challenge = tryDecodeChallenge(in, offset + 4, readableBytes - 
4);
+      if (challenge != null) {
+        out.add(challenge);
+      }
+    } else {
+      tryDecodeError(in, offset + 4, readableBytes - 4);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslClientHandler.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslClientHandler.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslClientHandler.java
deleted file mode 100644
index 0f11083..0000000
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslClientHandler.java
+++ /dev/null
@@ -1,401 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.security;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelDuplexHandler;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelPromise;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-
-import javax.security.auth.callback.CallbackHandler;
-import javax.security.sasl.Sasl;
-import javax.security.sasl.SaslClient;
-import javax.security.sasl.SaslException;
-
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.security.PrivilegedExceptionAction;
-import java.util.Random;
-
-/**
- * Handles Sasl connections
- */
[email protected]
-public class SaslClientHandler extends ChannelDuplexHandler {
-  private static final Log LOG = LogFactory.getLog(SaslClientHandler.class);
-
-  private final boolean fallbackAllowed;
-
-  private final UserGroupInformation ticket;
-
-  /**
-   * Used for client or server's token to send or receive from each other.
-   */
-  private final SaslClient saslClient;
-  private final SaslExceptionHandler exceptionHandler;
-  private final SaslSuccessfulConnectHandler successfulConnectHandler;
-  private byte[] saslToken;
-  private byte[] connectionHeader;
-  private boolean firstRead = true;
-
-  private int retryCount = 0;
-  private Random random;
-
-  /**
-   * Constructor
-   *
-   * @param ticket                   the ugi
-   * @param method                   auth method
-   * @param token                    for Sasl
-   * @param serverPrincipal          Server's Kerberos principal name
-   * @param fallbackAllowed          True if server may also fall back to less 
secure connection
-   * @param rpcProtection            Quality of protection. Can be 
'authentication', 'integrity' or
-   *                                 'privacy'.
-   * @param exceptionHandler         handler for exceptions
-   * @param successfulConnectHandler handler for succesful connects
-   * @throws java.io.IOException if handler could not be created
-   */
-  public SaslClientHandler(UserGroupInformation ticket, AuthMethod method,
-      Token<? extends TokenIdentifier> token, String serverPrincipal, boolean 
fallbackAllowed,
-      String rpcProtection, byte[] connectionHeader, SaslExceptionHandler 
exceptionHandler,
-      SaslSuccessfulConnectHandler successfulConnectHandler) throws 
IOException {
-    this.ticket = ticket;
-    this.fallbackAllowed = fallbackAllowed;
-    this.connectionHeader = connectionHeader;
-
-    this.exceptionHandler = exceptionHandler;
-    this.successfulConnectHandler = successfulConnectHandler;
-
-    SaslUtil.initSaslProperties(rpcProtection);
-    switch (method) {
-    case DIGEST:
-      if (LOG.isDebugEnabled())
-        LOG.debug("Creating SASL " + AuthMethod.DIGEST.getMechanismName()
-            + " client to authenticate to service at " + token.getService());
-      saslClient = createDigestSaslClient(new String[] { 
AuthMethod.DIGEST.getMechanismName() },
-          SaslUtil.SASL_DEFAULT_REALM, new 
HBaseSaslRpcClient.SaslClientCallbackHandler(token));
-      break;
-    case KERBEROS:
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Creating SASL " + AuthMethod.KERBEROS.getMechanismName()
-            + " client. Server's Kerberos principal name is " + 
serverPrincipal);
-      }
-      if (serverPrincipal == null || serverPrincipal.isEmpty()) {
-        throw new IOException("Failed to specify server's Kerberos principal 
name");
-      }
-      String[] names = SaslUtil.splitKerberosName(serverPrincipal);
-      if (names.length != 3) {
-        throw new IOException(
-            "Kerberos principal does not have the expected format: " + 
serverPrincipal);
-      }
-      saslClient = createKerberosSaslClient(new String[] { 
AuthMethod.KERBEROS.getMechanismName() },
-          names[0], names[1]);
-      break;
-    default:
-      throw new IOException("Unknown authentication method " + method);
-    }
-    if (saslClient == null) {
-      throw new IOException("Unable to find SASL client implementation");
-    }
-  }
-
-  /**
-   * Create a Digest Sasl client
-   *
-   * @param mechanismNames            names of mechanisms
-   * @param saslDefaultRealm          default realm for sasl
-   * @param saslClientCallbackHandler handler for the client
-   * @return new SaslClient
-   * @throws java.io.IOException if creation went wrong
-   */
-  protected SaslClient createDigestSaslClient(String[] mechanismNames, String 
saslDefaultRealm,
-      CallbackHandler saslClientCallbackHandler) throws IOException {
-    return Sasl.createSaslClient(mechanismNames, null, null, saslDefaultRealm, 
SaslUtil.SASL_PROPS,
-        saslClientCallbackHandler);
-  }
-
-  /**
-   * Create Kerberos client
-   *
-   * @param mechanismNames names of mechanisms
-   * @param userFirstPart  first part of username
-   * @param userSecondPart second part of username
-   * @return new SaslClient
-   * @throws java.io.IOException if fails
-   */
-  protected SaslClient createKerberosSaslClient(String[] mechanismNames, 
String userFirstPart,
-      String userSecondPart) throws IOException {
-    return Sasl
-        .createSaslClient(mechanismNames, null, userFirstPart, userSecondPart, 
SaslUtil.SASL_PROPS,
-            null);
-  }
-
-  @Override
-  public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
-    saslClient.dispose();
-  }
-
-  private byte[] evaluateChallenge(final byte[] challenge) throws Exception {
-    return ticket.doAs(new PrivilegedExceptionAction<byte[]>() {
-
-      @Override
-      public byte[] run() throws Exception {
-        return saslClient.evaluateChallenge(challenge);
-      }
-    });
-  }
-
-  @Override
-  public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
-    saslToken = new byte[0];
-    if (saslClient.hasInitialResponse()) {
-      saslToken = evaluateChallenge(saslToken);
-    }
-    if (saslToken != null) {
-      writeSaslToken(ctx, saslToken);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Have sent token of size " + saslToken.length + " from 
initSASLContext.");
-      }
-    }
-  }
-
-  @Override
-  public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
Exception {
-    ByteBuf in = (ByteBuf) msg;
-
-    // If not complete, try to negotiate
-    if (!saslClient.isComplete()) {
-      while (!saslClient.isComplete() && in.isReadable()) {
-        readStatus(in);
-        int len = in.readInt();
-        if (firstRead) {
-          firstRead = false;
-          if (len == SaslUtil.SWITCH_TO_SIMPLE_AUTH) {
-            if (!fallbackAllowed) {
-              throw new IOException("Server asks us to fall back to SIMPLE 
auth, " + "but this "
-                  + "client is configured to only allow secure connections.");
-            }
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Server asks us to fall back to simple auth.");
-            }
-            saslClient.dispose();
-
-            ctx.pipeline().remove(this);
-            successfulConnectHandler.onSuccess(ctx.channel());
-            return;
-          }
-        }
-        saslToken = new byte[len];
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Will read input token of size " + saslToken.length
-              + " for processing by initSASLContext");
-        }
-        in.readBytes(saslToken);
-
-        saslToken = evaluateChallenge(saslToken);
-        if (saslToken != null) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Will send token of size " + saslToken.length + " from 
initSASLContext.");
-          }
-          writeSaslToken(ctx, saslToken);
-        }
-      }
-      // release the memory
-      in.release();
-
-      if (saslClient.isComplete()) {
-        String qop = (String) saslClient.getNegotiatedProperty(Sasl.QOP);
-
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("SASL client context established. Negotiated QoP: " + qop);
-        }
-
-        boolean useWrap = qop != null && !"auth".equalsIgnoreCase(qop);
-
-        if (!useWrap) {
-          ctx.pipeline().remove(this);
-          successfulConnectHandler.onSuccess(ctx.channel());
-        } else {
-          byte[] wrappedCH = saslClient.wrap(connectionHeader, 0, 
connectionHeader.length);
-          // write connection header
-          writeSaslToken(ctx, wrappedCH);
-          successfulConnectHandler.onSaslProtectionSucess(ctx.channel());
-        }
-      }
-    }
-    // Normal wrapped reading
-    else {
-      try {
-        int length = in.readInt();
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Actual length is " + length);
-        }
-        saslToken = new byte[length];
-        in.readBytes(saslToken);
-        // release the memory
-        in.release();
-      } catch (IndexOutOfBoundsException e) {
-        return;
-      }
-      try {
-        ByteBuf b = ctx.channel().alloc().buffer(saslToken.length);
-
-        b.writeBytes(saslClient.unwrap(saslToken, 0, saslToken.length));
-        ctx.fireChannelRead(b);
-
-      } catch (SaslException se) {
-        try {
-          saslClient.dispose();
-        } catch (SaslException ignored) {
-          LOG.debug("Ignoring SASL exception", ignored);
-        }
-        throw se;
-      }
-    }
-  }
-
-  /**
-   * Write SASL token
-   * @param ctx to write to
-   * @param saslToken to write
-   */
-  private void writeSaslToken(final ChannelHandlerContext ctx, byte[] 
saslToken) {
-    ByteBuf b = ctx.alloc().buffer(4 + saslToken.length);
-    b.writeInt(saslToken.length);
-    b.writeBytes(saslToken, 0, saslToken.length);
-    ctx.writeAndFlush(b).addListener(new ChannelFutureListener() {
-      @Override
-      public void operationComplete(ChannelFuture future) throws Exception {
-        if (!future.isSuccess()) {
-          exceptionCaught(ctx, future.cause());
-        }
-      }
-    });
-  }
-
-  /**
-   * Get the read status
-   *
-   * @param inStream to read
-   * @throws org.apache.hadoop.ipc.RemoteException if status was not success
-   */
-  private static void readStatus(ByteBuf inStream) throws RemoteException {
-    int status = inStream.readInt(); // read status
-    if (status != SaslStatus.SUCCESS.state) {
-      throw new RemoteException(inStream.toString(Charset.forName("UTF-8")),
-          inStream.toString(Charset.forName("UTF-8")));
-    }
-  }
-
-  @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable 
cause)
-      throws Exception {
-    saslClient.dispose();
-
-    ctx.close();
-
-    if (this.random == null) {
-      this.random = new Random();
-    }
-    exceptionHandler.handle(this.retryCount++, this.random, cause);
-  }
-
-  @Override
-  public void write(final ChannelHandlerContext ctx, Object msg, 
ChannelPromise promise)
-      throws Exception {
-    // If not complete, try to negotiate
-    if (!saslClient.isComplete()) {
-      super.write(ctx, msg, promise);
-    } else {
-      ByteBuf in = (ByteBuf) msg;
-      byte[] unwrapped = new byte[in.readableBytes()];
-      in.readBytes(unwrapped);
-      // release the memory
-      in.release();
-
-      try {
-        saslToken = saslClient.wrap(unwrapped, 0, unwrapped.length);
-      } catch (SaslException se) {
-        try {
-          saslClient.dispose();
-        } catch (SaslException ignored) {
-          LOG.debug("Ignoring SASL exception", ignored);
-        }
-        promise.setFailure(se);
-      }
-      if (saslToken != null) {
-        ByteBuf out = ctx.channel().alloc().buffer(4 + saslToken.length);
-        out.writeInt(saslToken.length);
-        out.writeBytes(saslToken, 0, saslToken.length);
-
-        ctx.write(out).addListener(new ChannelFutureListener() {
-          @Override public void operationComplete(ChannelFuture future) throws 
Exception {
-            if (!future.isSuccess()) {
-              exceptionCaught(ctx, future.cause());
-            }
-          }
-        });
-
-        saslToken = null;
-      }
-    }
-  }
-
-  /**
-   * Handler for exceptions during Sasl connection
-   */
-  public interface SaslExceptionHandler {
-    /**
-     * Handle the exception
-     *
-     * @param retryCount current retry count
-     * @param random     to create new backoff with
-     * @param cause      of fail
-     */
-    public void handle(int retryCount, Random random, Throwable cause);
-  }
-
-  /**
-   * Handler for successful connects
-   */
-  public interface SaslSuccessfulConnectHandler {
-    /**
-     * Runs on success
-     *
-     * @param channel which is successfully authenticated
-     */
-    public void onSuccess(Channel channel);
-
-    /**
-     * Runs on success if data protection used in Sasl
-     *
-     * @param channel which is successfully authenticated
-     */
-    public void onSaslProtectionSucess(Channel channel);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUnwrapHandler.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUnwrapHandler.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUnwrapHandler.java
new file mode 100644
index 0000000..e631478
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUnwrapHandler.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+
+import javax.security.sasl.SaslClient;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Unwrap sasl messages. Should be placed after a
+ * {@link io.netty.handler.codec.LengthFieldBasedFrameDecoder}
+ */
[email protected]
+public class SaslUnwrapHandler extends SimpleChannelInboundHandler<ByteBuf> {
+
+  private final SaslClient saslClient;
+
+  public SaslUnwrapHandler(SaslClient saslClient) {
+    this.saslClient = saslClient;
+  }
+
+  @Override
+  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+    SaslUtil.safeDispose(saslClient);
+    ctx.fireChannelInactive();
+  }
+
+  @Override
+  protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws 
Exception {
+    byte[] bytes = new byte[msg.readableBytes()];
+    msg.readBytes(bytes);
+    ctx.fireChannelRead(Unpooled.wrappedBuffer(saslClient.unwrap(bytes, 0, 
bytes.length)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUtil.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUtil.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUtil.java
index cfc4088..aaa9d7a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUtil.java
@@ -18,14 +18,12 @@
  */
 package org.apache.hadoop.hbase.security;
 
-import org.apache.commons.codec.binary.Base64;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-
-import java.util.Locale;
 import java.util.Map;
 import java.util.TreeMap;
 
 import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
 
 import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.logging.Log;
@@ -34,26 +32,33 @@ import 
org.apache.hadoop.hbase.classification.InterfaceAudience;
 
 @InterfaceAudience.Private
 public class SaslUtil {
-  private static final Log log = LogFactory.getLog(SaslUtil.class);
+  private static final Log LOG = LogFactory.getLog(SaslUtil.class);
   public static final String SASL_DEFAULT_REALM = "default";
-  public static final Map<String, String> SASL_PROPS =
-      new TreeMap<String, String>();
   public static final int SWITCH_TO_SIMPLE_AUTH = -88;
 
-  public static enum QualityOfProtection {
+  public enum QualityOfProtection {
     AUTHENTICATION("auth"),
     INTEGRITY("auth-int"),
     PRIVACY("auth-conf");
 
-    public final String saslQop;
+    private final String saslQop;
 
-    private QualityOfProtection(String saslQop) {
+    QualityOfProtection(String saslQop) {
       this.saslQop = saslQop;
     }
 
     public String getSaslQop() {
       return saslQop;
     }
+
+    public boolean matches(String stringQop) {
+      if (saslQop.equals(stringQop)) {
+        LOG.warn("Use authentication/integrity/privacy as value for rpc 
protection "
+            + "configurations instead of auth/auth-int/auth-conf.");
+        return true;
+      }
+      return name().equalsIgnoreCase(stringQop);
+    }
   }
 
   /** Splitting fully qualified Kerberos name into parts */
@@ -75,40 +80,47 @@ public class SaslUtil {
 
   /**
    * Returns {@link 
org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection}
-   * corresponding to the given {@code stringQop} value. Returns null if value 
is
-   * invalid.
+   * corresponding to the given {@code stringQop} value.
+   * @throws IllegalArgumentException If stringQop doesn't match any QOP.
    */
   public static QualityOfProtection getQop(String stringQop) {
-    QualityOfProtection qop = null;
-    if 
(QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT).equals(stringQop)
-        || QualityOfProtection.AUTHENTICATION.saslQop.equals(stringQop)) {
-      qop = QualityOfProtection.AUTHENTICATION;
-    } else if 
(QualityOfProtection.INTEGRITY.name().toLowerCase(Locale.ROOT).equals(stringQop)
-        || QualityOfProtection.INTEGRITY.saslQop.equals(stringQop)) {
-      qop = QualityOfProtection.INTEGRITY;
-    } else if 
(QualityOfProtection.PRIVACY.name().toLowerCase(Locale.ROOT).equals(stringQop)
-        || QualityOfProtection.PRIVACY.saslQop.equals(stringQop)) {
-      qop = QualityOfProtection.PRIVACY;
+    for (QualityOfProtection qop : QualityOfProtection.values()) {
+      if (qop.matches(stringQop)) {
+        return qop;
+      }
     }
-    if (qop == null) {
-      throw new IllegalArgumentException("Invalid qop: " +  stringQop
-          + ". It must be one of 'authentication', 'integrity', 'privacy'.");
-    }
-    if (QualityOfProtection.AUTHENTICATION.saslQop.equals(stringQop)
-        || QualityOfProtection.INTEGRITY.saslQop.equals(stringQop)
-        || QualityOfProtection.PRIVACY.saslQop.equals(stringQop)) {
-      log.warn("Use authentication/integrity/privacy as value for rpc 
protection "
-          + "configurations instead of auth/auth-int/auth-conf.");
+    throw new IllegalArgumentException("Invalid qop: " +  stringQop
+        + ". It must be one of 'authentication', 'integrity', 'privacy'.");
+  }
+
+  /**
+   * @param rpcProtection Value of 'hbase.rpc.protection' configuration.
+   * @return Map with values for SASL properties.
+   */
+  static Map<String, String> initSaslProperties(String rpcProtection) {
+    String saslQop;
+    if (rpcProtection.isEmpty()) {
+      saslQop = QualityOfProtection.AUTHENTICATION.getSaslQop();
+    } else {
+      String[] qops = rpcProtection.split(",");
+      StringBuilder saslQopBuilder = new StringBuilder();
+      for (int i = 0; i < qops.length; ++i) {
+        QualityOfProtection qop = getQop(qops[i]);
+        saslQopBuilder.append(",").append(qop.getSaslQop());
+      }
+      saslQop = saslQopBuilder.substring(1);  // remove first ','
     }
-    return qop;
+    Map<String, String> saslProps = new TreeMap<>();
+    saslProps.put(Sasl.QOP, saslQop);
+    saslProps.put(Sasl.SERVER_AUTH, "true");
+    return saslProps;
   }
 
-  static void initSaslProperties(String rpcProtection) {
-    QualityOfProtection saslQOP = getQop(rpcProtection);
-    if (saslQOP == null) {
-      saslQOP = QualityOfProtection.AUTHENTICATION;
+  static void safeDispose(SaslClient saslClient) {
+    try {
+      saslClient.dispose();
+    } catch (SaslException e) {
+      LOG.error("Error disposing of SASL client", e);
     }
-    SaslUtil.SASL_PROPS.put(Sasl.QOP, saslQOP.getSaslQop());
-    SaslUtil.SASL_PROPS.put(Sasl.SERVER_AUTH, "true");
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslWrapHandler.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslWrapHandler.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslWrapHandler.java
new file mode 100644
index 0000000..14ecf2e
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslWrapHandler.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.CoalescingBufferQueue;
+import io.netty.util.ReferenceCountUtil;
+import io.netty.util.concurrent.PromiseCombiner;
+
+import java.io.IOException;
+
+import javax.security.sasl.SaslClient;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * wrap sasl messages.
+ */
[email protected]
+public class SaslWrapHandler extends ChannelOutboundHandlerAdapter {
+
+  private final SaslClient saslClient;
+
+  private CoalescingBufferQueue queue;
+
+  public SaslWrapHandler(SaslClient saslClient) {
+    this.saslClient = saslClient;
+  }
+
+  @Override
+  public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
+    queue = new CoalescingBufferQueue(ctx.channel());
+  }
+
+  @Override
+  public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise 
promise)
+      throws Exception {
+    if (msg instanceof ByteBuf) {
+      queue.add((ByteBuf) msg, promise);
+    } else {
+      ctx.write(msg, promise);
+    }
+  }
+
+  @Override
+  public void flush(ChannelHandlerContext ctx) throws Exception {
+    if (queue.isEmpty()) {
+      return;
+    }
+    ByteBuf buf = null;
+    try {
+      ChannelPromise promise = ctx.newPromise();
+      int readableBytes = queue.readableBytes();
+      buf = queue.remove(readableBytes, promise);
+      byte[] bytes = new byte[readableBytes];
+      buf.readBytes(bytes);
+      byte[] wrapperBytes = saslClient.wrap(bytes, 0, bytes.length);
+      ChannelPromise lenPromise = ctx.newPromise();
+      ctx.write(ctx.alloc().buffer(4).writeInt(wrapperBytes.length), 
lenPromise);
+      ChannelPromise contentPromise = ctx.newPromise();
+      ctx.write(Unpooled.wrappedBuffer(wrapperBytes), contentPromise);
+      PromiseCombiner combiner = new PromiseCombiner();
+      combiner.addAll(lenPromise, contentPromise);
+      combiner.finish(promise);
+      ctx.flush();
+    } finally {
+      if (buf != null) {
+        ReferenceCountUtil.safeRelease(buf);
+      }
+    }
+  }
+
+  @Override
+  public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws 
Exception {
+    if (!queue.isEmpty()) {
+      queue.releaseAndFailAll(new IOException("Connection closed"));
+    }
+    ctx.close(promise);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlClient.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlClient.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlClient.java
index 5799aaf..2f9d921 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlClient.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlClient.java
@@ -38,7 +38,7 @@ import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.client.security.SecurityCapability;
 import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
-import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
 import 
org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService.BlockingInterface;
@@ -99,7 +99,7 @@ public class AccessControlClient {
   public static void grant(Connection connection, final TableName tableName, 
final String userName,
       final byte[] family, final byte[] qual, boolean mergeExistingPermissions,
       final Permission.Action... actions) throws Throwable {
-    PayloadCarryingRpcController controller =
+    HBaseRpcController controller =
         ((ClusterConnection) 
connection).getRpcControllerFactory().newController();
     controller.setPriority(tableName);
     try (Table table = connection.getTable(ACL_TABLE_NAME)) {
@@ -139,7 +139,7 @@ public class AccessControlClient {
   public static void grant(final Connection connection, final String namespace,
       final String userName, boolean mergeExistingPermissions, final 
Permission.Action... actions)
       throws Throwable {
-    PayloadCarryingRpcController controller =
+    HBaseRpcController controller =
         ((ClusterConnection) 
connection).getRpcControllerFactory().newController();
 
     try (Table table = connection.getTable(ACL_TABLE_NAME)) {
@@ -174,7 +174,7 @@ public class AccessControlClient {
    */
   public static void grant(final Connection connection, final String userName,
       boolean mergeExistingPermissions, final Permission.Action... actions) 
throws Throwable {
-    PayloadCarryingRpcController controller =
+    HBaseRpcController controller =
         ((ClusterConnection) 
connection).getRpcControllerFactory().newController();
     try (Table table = connection.getTable(ACL_TABLE_NAME)) {
       ProtobufUtil.grant(controller, getAccessControlServiceStub(table), 
userName,
@@ -214,7 +214,7 @@ public class AccessControlClient {
   public static void revoke(final Connection connection, final TableName 
tableName,
       final String username, final byte[] family, final byte[] qualifier,
       final Permission.Action... actions) throws Throwable {
-    PayloadCarryingRpcController controller
+    HBaseRpcController controller
       = ((ClusterConnection) 
connection).getRpcControllerFactory().newController();
     controller.setPriority(tableName);
     try (Table table = connection.getTable(ACL_TABLE_NAME)) {
@@ -233,7 +233,7 @@ public class AccessControlClient {
    */
   public static void revoke(final Connection connection, final String 
namespace,
       final String userName, final Permission.Action... actions) throws 
Throwable {
-    PayloadCarryingRpcController controller
+    HBaseRpcController controller
       = ((ClusterConnection) 
connection).getRpcControllerFactory().newController();
     try (Table table = connection.getTable(ACL_TABLE_NAME)) {
       ProtobufUtil.revoke(controller, getAccessControlServiceStub(table), 
userName, namespace,
@@ -247,7 +247,7 @@ public class AccessControlClient {
    */
   public static void revoke(final Connection connection, final String userName,
       final Permission.Action... actions) throws Throwable {
-    PayloadCarryingRpcController controller
+    HBaseRpcController controller
       = ((ClusterConnection) 
connection).getRpcControllerFactory().newController();
     try (Table table = connection.getTable(ACL_TABLE_NAME)) {
       ProtobufUtil.revoke(controller, getAccessControlServiceStub(table), 
userName, actions);
@@ -263,7 +263,7 @@ public class AccessControlClient {
    */
   public static List<UserPermission> getUserPermissions(Connection connection, 
String tableRegex)
       throws Throwable {
-    PayloadCarryingRpcController controller
+    HBaseRpcController controller
       = ((ClusterConnection) 
connection).getRpcControllerFactory().newController();
     List<UserPermission> permList = new ArrayList<UserPermission>();
     try (Table table = connection.getTable(ACL_TABLE_NAME)) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java
index 1630d83..f70e0de 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java
@@ -17,6 +17,9 @@
  */
 package org.apache.hadoop.hbase.zookeeper;
 
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.RpcController;
+
 import java.io.EOFException;
 import java.io.IOException;
 import java.net.ConnectException;
@@ -43,7 +46,6 @@ import org.apache.hadoop.hbase.client.RegionReplicaUtil;
 import org.apache.hadoop.hbase.client.RetriesExhaustedException;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.ipc.FailedServerException;
-import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
 import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
 import org.apache.hadoop.hbase.master.RegionState;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@@ -58,8 +60,6 @@ import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.zookeeper.KeeperException;
 
-import com.google.protobuf.InvalidProtocolBufferException;
-
 /**
  * Utility class to perform operation (get/wait for/verify/set/delete) on 
znode in ZooKeeper
  * which keeps hbase:meta region server location.
@@ -319,7 +319,7 @@ public class MetaTableLocator {
       return false;
     }
     Throwable t;
-    PayloadCarryingRpcController controller = null;
+    RpcController controller = null;
     if (connection instanceof ClusterConnection) {
       controller = ((ClusterConnection) 
connection).getRpcControllerFactory().newController();
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestSnapshotFromAdmin.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestSnapshotFromAdmin.java
 
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestSnapshotFromAdmin.java
index 8aa8007..6385c27 100644
--- 
a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestSnapshotFromAdmin.java
+++ 
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestSnapshotFromAdmin.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.client;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import com.google.protobuf.RpcController;
+
 import java.io.IOException;
 
 import org.apache.commons.logging.Log;
@@ -27,21 +29,19 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import 
org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
 import 
org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
 import 
org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SnapshotRequest;
 import 
org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SnapshotResponse;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.mockito.Mockito;
 
-import com.google.protobuf.RpcController;
-
 /**
  * Test snapshot logic from the client
  */
@@ -88,7 +88,7 @@ public class TestSnapshotFromAdmin {
     RpcRetryingCallerFactory callerFactory = new 
RpcRetryingCallerFactory(conf);
     RpcControllerFactory controllerFactory = 
Mockito.mock(RpcControllerFactory.class);
     Mockito.when(controllerFactory.newController()).thenReturn(
-      Mockito.mock(PayloadCarryingRpcController.class));
+      Mockito.mock(HBaseRpcController.class));
     
Mockito.when(mockConnection.getRpcRetryingCallerFactory()).thenReturn(callerFactory);
     
Mockito.when(mockConnection.getRpcControllerFactory()).thenReturn(controllerFactory);
     // set the max wait time for the snapshot to complete
@@ -136,7 +136,7 @@ public class TestSnapshotFromAdmin {
     RpcRetryingCallerFactory callerFactory = new 
RpcRetryingCallerFactory(conf);
     RpcControllerFactory controllerFactory = 
Mockito.mock(RpcControllerFactory.class);
     Mockito.when(controllerFactory.newController()).thenReturn(
-      Mockito.mock(PayloadCarryingRpcController.class));
+      Mockito.mock(HBaseRpcController.class));
     
Mockito.when(mockConnection.getRpcRetryingCallerFactory()).thenReturn(callerFactory);
     
Mockito.when(mockConnection.getRpcControllerFactory()).thenReturn(controllerFactory);
     Admin admin = new HBaseAdmin(mockConnection);

http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestCellBlockBuilder.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestCellBlockBuilder.java
 
b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestCellBlockBuilder.java
new file mode 100644
index 0000000..6506347
--- /dev/null
+++ 
b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestCellBlockBuilder.java
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import org.apache.commons.lang.time.StopWatch;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.codec.Codec;
+import org.apache.hadoop.hbase.codec.KeyValueCodec;
+import org.apache.hadoop.hbase.io.SizedCellScanner;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ClassSize;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.log4j.Level;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ ClientTests.class, SmallTests.class })
+public class TestCellBlockBuilder {
+
+  private static final Log LOG = LogFactory.getLog(TestCellBlockBuilder.class);
+
+  private CellBlockBuilder builder;
+
+  @Before
+  public void before() {
+    this.builder = new CellBlockBuilder(HBaseConfiguration.create());
+  }
+
+  @Test
+  public void testBuildCellBlock() throws IOException {
+    doBuildCellBlockUndoCellBlock(this.builder, new KeyValueCodec(), null);
+    doBuildCellBlockUndoCellBlock(this.builder, new KeyValueCodec(), new 
DefaultCodec());
+    doBuildCellBlockUndoCellBlock(this.builder, new KeyValueCodec(), new 
GzipCodec());
+  }
+
+  static void doBuildCellBlockUndoCellBlock(final CellBlockBuilder builder, 
final Codec codec,
+      final CompressionCodec compressor) throws IOException {
+    doBuildCellBlockUndoCellBlock(builder, codec, compressor, 10, 1, false);
+  }
+
+  static void doBuildCellBlockUndoCellBlock(final CellBlockBuilder builder, 
final Codec codec,
+      final CompressionCodec compressor, final int count, final int size, 
final boolean sized)
+      throws IOException {
+    Cell[] cells = getCells(count, size);
+    CellScanner cellScanner = sized ? getSizedCellScanner(cells)
+        : CellUtil.createCellScanner(Arrays.asList(cells).iterator());
+    ByteBuffer bb = builder.buildCellBlock(codec, compressor, cellScanner);
+    cellScanner = builder.createCellScanner(codec, compressor, bb);
+    int i = 0;
+    while (cellScanner.advance()) {
+      i++;
+    }
+    assertEquals(count, i);
+  }
+
+  static CellScanner getSizedCellScanner(final Cell[] cells) {
+    int size = -1;
+    for (Cell cell : cells) {
+      size += CellUtil.estimatedSerializedSizeOf(cell);
+    }
+    final int totalSize = ClassSize.align(size);
+    final CellScanner cellScanner = CellUtil.createCellScanner(cells);
+    return new SizedCellScanner() {
+      @Override
+      public long heapSize() {
+        return totalSize;
+      }
+
+      @Override
+      public Cell current() {
+        return cellScanner.current();
+      }
+
+      @Override
+      public boolean advance() throws IOException {
+        return cellScanner.advance();
+      }
+    };
+  }
+
+  static Cell[] getCells(final int howMany) {
+    return getCells(howMany, 1024);
+  }
+
+  static Cell[] getCells(final int howMany, final int valueSize) {
+    Cell[] cells = new Cell[howMany];
+    byte[] value = new byte[valueSize];
+    for (int i = 0; i < howMany; i++) {
+      byte[] index = Bytes.toBytes(i);
+      KeyValue kv = new KeyValue(index, Bytes.toBytes("f"), index, value);
+      cells[i] = kv;
+    }
+    return cells;
+  }
+
+  private static final String COUNT = "--count=";
+  private static final String SIZE = "--size=";
+
+  /**
+   * Prints usage and then exits w/ passed <code>errCode</code>
+   * @param errCode
+   */
+  private static void usage(final int errCode) {
+    System.out.println("Usage: IPCUtil [options]");
+    System.out.println("Micro-benchmarking how changed sizes and counts work 
with buffer resizing");
+    System.out.println(" --count  Count of Cells");
+    System.out.println(" --size   Size of Cell values");
+    System.out.println("Example: IPCUtil --count=1024 --size=1024");
+    System.exit(errCode);
+  }
+
+  private static void timerTests(final CellBlockBuilder builder, final int 
count, final int size,
+      final Codec codec, final CompressionCodec compressor) throws IOException 
{
+    final int cycles = 1000;
+    StopWatch timer = new StopWatch();
+    timer.start();
+    for (int i = 0; i < cycles; i++) {
+      timerTest(builder, timer, count, size, codec, compressor, false);
+    }
+    timer.stop();
+    LOG.info("Codec=" + codec + ", compression=" + compressor + ", sized=" + 
false + ", count=" +
+        count + ", size=" + size + ", + took=" + timer.getTime() + "ms");
+    timer.reset();
+    timer.start();
+    for (int i = 0; i < cycles; i++) {
+      timerTest(builder, timer, count, size, codec, compressor, true);
+    }
+    timer.stop();
+    LOG.info("Codec=" + codec + ", compression=" + compressor + ", sized=" + 
true + ", count=" +
+        count + ", size=" + size + ", + took=" + timer.getTime() + "ms");
+  }
+
+  private static void timerTest(final CellBlockBuilder builder, final 
StopWatch timer,
+      final int count, final int size, final Codec codec, final 
CompressionCodec compressor,
+      final boolean sized) throws IOException {
+    doBuildCellBlockUndoCellBlock(builder, codec, compressor, count, size, 
sized);
+  }
+
+  /**
+   * For running a few tests of methods herein.
+   * @param args
+   * @throws IOException
+   */
+  public static void main(String[] args) throws IOException {
+    int count = 1024;
+    int size = 10240;
+    for (String arg : args) {
+      if (arg.startsWith(COUNT)) {
+        count = Integer.parseInt(arg.replace(COUNT, ""));
+      } else if (arg.startsWith(SIZE)) {
+        size = Integer.parseInt(arg.replace(SIZE, ""));
+      } else {
+        usage(1);
+      }
+    }
+    CellBlockBuilder builder = new 
CellBlockBuilder(HBaseConfiguration.create());
+    ((Log4JLogger) CellBlockBuilder.LOG).getLogger().setLevel(Level.ALL);
+    timerTests(builder, count, size, new KeyValueCodec(), null);
+    timerTests(builder, count, size, new KeyValueCodec(), new DefaultCodec());
+    timerTests(builder, count, size, new KeyValueCodec(), new GzipCodec());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestHBaseRpcControllerImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestHBaseRpcControllerImpl.java
 
b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestHBaseRpcControllerImpl.java
new file mode 100644
index 0000000..3b0a6b2
--- /dev/null
+++ 
b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestHBaseRpcControllerImpl.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.ipc;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellScannable;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ ClientTests.class, SmallTests.class })
+public class TestHBaseRpcControllerImpl {
+
+  @Test
+  public void testListOfCellScannerables() throws IOException {
+    List<CellScannable> cells = new ArrayList<CellScannable>();
+    final int count = 10;
+    for (int i = 0; i < count; i++) {
+      cells.add(createCell(i));
+    }
+    HBaseRpcController controller = new HBaseRpcControllerImpl(cells);
+    CellScanner cellScanner = controller.cellScanner();
+    int index = 0;
+    for (; cellScanner.advance(); index++) {
+      Cell cell = cellScanner.current();
+      byte[] indexBytes = Bytes.toBytes(index);
+      assertTrue("" + index, Bytes.equals(indexBytes, 0, indexBytes.length, 
cell.getValueArray(),
+        cell.getValueOffset(), cell.getValueLength()));
+    }
+    assertEquals(count, index);
+  }
+
+  /**
+   * @param index
+   * @return A faked out 'Cell' that does nothing but return index as its value
+   */
+  static CellScannable createCell(final int index) {
+    return new CellScannable() {
+      @Override
+      public CellScanner cellScanner() {
+        return new CellScanner() {
+          @Override
+          public Cell current() {
+            // Fake out a Cell. All this Cell has is a value that is an int in 
size and equal
+            // to the above 'index' param serialized as an int.
+            return new Cell() {
+              private final int i = index;
+
+              @Override
+              public byte[] getRowArray() {
+                // unused
+                return null;
+              }
+
+              @Override
+              public int getRowOffset() {
+                // unused
+                return 0;
+              }
+
+              @Override
+              public short getRowLength() {
+                // unused
+                return 0;
+              }
+
+              @Override
+              public byte[] getFamilyArray() {
+                // unused
+                return null;
+              }
+
+              @Override
+              public int getFamilyOffset() {
+                // unused
+                return 0;
+              }
+
+              @Override
+              public byte getFamilyLength() {
+                // unused
+                return 0;
+              }
+
+              @Override
+              public byte[] getQualifierArray() {
+                // unused
+                return null;
+              }
+
+              @Override
+              public int getQualifierOffset() {
+                // unused
+                return 0;
+              }
+
+              @Override
+              public int getQualifierLength() {
+                // unused
+                return 0;
+              }
+
+              @Override
+              public long getTimestamp() {
+                // unused
+                return 0;
+              }
+
+              @Override
+              public byte getTypeByte() {
+                // unused
+                return 0;
+              }
+
+              @Override
+              public long getSequenceId() {
+                // unused
+                return 0;
+              }
+
+              @Override
+              public byte[] getValueArray() {
+                return Bytes.toBytes(this.i);
+              }
+
+              @Override
+              public int getValueOffset() {
+                return 0;
+              }
+
+              @Override
+              public int getValueLength() {
+                return Bytes.SIZEOF_INT;
+              }
+
+              @Override
+              public int getTagsOffset() {
+                // unused
+                return 0;
+              }
+
+              @Override
+              public int getTagsLength() {
+                // unused
+                return 0;
+              }
+
+              @Override
+              public byte[] getTagsArray() {
+                // unused
+                return null;
+              }
+
+              @Override
+              public long getMvccVersion() {
+                return 0;
+              }
+
+              @Override
+              public byte[] getValue() {
+                return Bytes.toBytes(this.i);
+              }
+
+              @Override
+              public byte[] getFamily() {
+                return null;
+              }
+
+              @Override
+              public byte[] getQualifier() {
+                return null;
+              }
+
+              @Override
+              public byte[] getRow() {
+                return null;
+              }
+            };
+          }
+
+          private boolean hasCell = true;
+
+          @Override
+          public boolean advance() {
+            // We have one Cell only so return true first time then false ever 
after.
+            if (!hasCell) return hasCell;
+            hasCell = false;
+            return true;
+          }
+        };
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java 
b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java
index bb580c8..7c4ac02 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java
@@ -17,180 +17,32 @@
  */
 package org.apache.hadoop.hbase.ipc;
 
-import static org.junit.Assert.assertEquals;
+import static org.apache.hadoop.hbase.ipc.IPCUtil.wrapException;
+import static org.junit.Assert.assertTrue;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
+import java.net.ConnectException;
+import java.net.InetSocketAddress;
+import java.net.SocketTimeoutException;
 
-import org.apache.commons.lang.time.StopWatch;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.commons.logging.impl.Log4JLogger;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellScanner;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.apache.hadoop.hbase.codec.Codec;
-import org.apache.hadoop.hbase.codec.KeyValueCodec;
-import org.apache.hadoop.hbase.io.SizedCellScanner;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.ClassSize;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.io.compress.GzipCodec;
-import org.apache.log4j.Level;
-import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-@Category(SmallTests.class) 
+@Category({ ClientTests.class, SmallTests.class })
 public class TestIPCUtil {
 
-  private static final Log LOG = LogFactory.getLog(TestIPCUtil.class);
-
-  IPCUtil util;
-  @Before
-  public void before() {
-    this.util = new IPCUtil(new Configuration());
-  }
-
   @Test
-  public void testBuildCellBlock() throws IOException {
-    doBuildCellBlockUndoCellBlock(this.util, new KeyValueCodec(), null);
-    doBuildCellBlockUndoCellBlock(this.util, new KeyValueCodec(), new 
DefaultCodec());
-    doBuildCellBlockUndoCellBlock(this.util, new KeyValueCodec(), new 
GzipCodec());
-  }
-
-  static void doBuildCellBlockUndoCellBlock(final IPCUtil util,
-      final Codec codec, final CompressionCodec compressor)
-  throws IOException {
-    doBuildCellBlockUndoCellBlock(util, codec, compressor, 10, 1, false);
-  }
-
-  static void doBuildCellBlockUndoCellBlock(final IPCUtil util, final Codec 
codec,
-    final CompressionCodec compressor, final int count, final int size, final 
boolean sized)
-  throws IOException {
-    Cell [] cells = getCells(count, size);
-    CellScanner cellScanner = sized? getSizedCellScanner(cells):
-      CellUtil.createCellScanner(Arrays.asList(cells).iterator());
-    ByteBuffer bb = util.buildCellBlock(codec, compressor, cellScanner);
-    cellScanner = util.createCellScanner(codec, compressor, bb);
-    int i = 0;
-    while (cellScanner.advance()) {
-      i++;
-    }
-    assertEquals(count, i);
-  }
-
-  static CellScanner getSizedCellScanner(final Cell [] cells) {
-    int size = -1;
-    for (Cell cell: cells) {
-      size += CellUtil.estimatedSerializedSizeOf(cell);
-    }
-    final int totalSize = ClassSize.align(size);
-    final CellScanner cellScanner = CellUtil.createCellScanner(cells);
-    return new SizedCellScanner() {
-      @Override
-      public long heapSize() {
-        return totalSize;
-      }
-
-      @Override
-      public Cell current() {
-        return cellScanner.current();
-      }
-
-      @Override
-      public boolean advance() throws IOException {
-        return cellScanner.advance();
-      }
-    };
-  }
-
-  static Cell [] getCells(final int howMany) {
-    return getCells(howMany, 1024);
-  }
-
-  static Cell [] getCells(final int howMany, final int valueSize) {
-    Cell [] cells = new Cell[howMany];
-    byte [] value = new byte[valueSize];
-    for (int i = 0; i < howMany; i++) {
-      byte [] index = Bytes.toBytes(i);
-      KeyValue kv = new KeyValue(index, Bytes.toBytes("f"), index, value);
-      cells[i] = kv;
-    }
-    return cells;
-  }
-
-  private static final String COUNT = "--count=";
-  private static final String SIZE = "--size=";
-
-  /**
-   * Prints usage and then exits w/ passed <code>errCode</code>
-   * @param errCode
-   */
-  private static void usage(final int errCode) {
-    System.out.println("Usage: IPCUtil [options]");
-    System.out.println("Micro-benchmarking how changed sizes and counts work 
with buffer resizing");
-    System.out.println(" --count  Count of Cells");
-    System.out.println(" --size   Size of Cell values");
-    System.out.println("Example: IPCUtil --count=1024 --size=1024");
-    System.exit(errCode);
-  }
-
-  private static void timerTests(final IPCUtil util, final int count, final 
int size,
-      final Codec codec, final CompressionCodec compressor)
-  throws IOException {
-    final int cycles = 1000;
-    StopWatch timer = new StopWatch();
-    timer.start();
-    for (int i = 0; i < cycles; i++) {
-      timerTest(util, timer, count, size, codec, compressor, false);
-    }
-    timer.stop();
-    LOG.info("Codec=" + codec + ", compression=" + compressor + ", sized=" + 
false +
-        ", count=" + count + ", size=" + size + ", + took=" + timer.getTime() 
+ "ms");
-    timer.reset();
-    timer.start();
-    for (int i = 0; i < cycles; i++) {
-      timerTest(util, timer, count, size, codec, compressor, true);
-    }
-    timer.stop();
-    LOG.info("Codec=" + codec + ", compression=" + compressor + ", sized=" + 
true +
-      ", count=" + count + ", size=" + size + ", + took=" + timer.getTime() + 
"ms");
-  }
-
-  private static void timerTest(final IPCUtil util, final StopWatch timer, 
final int count,
-      final int size, final Codec codec, final CompressionCodec compressor, 
final boolean sized)
-  throws IOException {
-    doBuildCellBlockUndoCellBlock(util, codec, compressor, count, size, sized);
-  }
-
-  /**
-   * For running a few tests of methods herein.
-   * @param args
-   * @throws IOException
-   */
-  public static void main(String[] args) throws IOException {
-    int count = 1024;
-    int size = 10240;
-    for (String arg: args) {
-      if (arg.startsWith(COUNT)) {
-        count = Integer.parseInt(arg.replace(COUNT, ""));
-      } else if (arg.startsWith(SIZE)) {
-        size = Integer.parseInt(arg.replace(SIZE, ""));
-      } else {
-        usage(1);
-      }
-    }
-    IPCUtil util = new IPCUtil(HBaseConfiguration.create());
-    ((Log4JLogger)IPCUtil.LOG).getLogger().setLevel(Level.ALL);
-    timerTests(util, count, size,  new KeyValueCodec(), null);
-    timerTests(util, count, size,  new KeyValueCodec(), new DefaultCodec());
-    timerTests(util, count, size,  new KeyValueCodec(), new GzipCodec());
+  public void testWrapException() throws Exception {
+    final InetSocketAddress address = 
InetSocketAddress.createUnresolved("localhost", 0);
+    assertTrue(wrapException(address, new ConnectException()) instanceof 
ConnectException);
+    assertTrue(
+      wrapException(address, new SocketTimeoutException()) instanceof 
SocketTimeoutException);
+    assertTrue(wrapException(address, new ConnectionClosingException(
+        "Test AbstractRpcClient#wrapException")) instanceof 
ConnectionClosingException);
+    assertTrue(
+      wrapException(address, new CallTimeoutException("Test 
AbstractRpcClient#wrapException"))
+          .getCause() instanceof CallTimeoutException);
   }
 }

Reply via email to