This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new 0c4263a18be HBASE-27185 Rewrite NettyRpcServer to decode rpc request 
with netty handler (#4624)
0c4263a18be is described below

commit 0c4263a18be13f412fcf71ebba9715dcc5ca2a12
Author: Duo Zhang <[email protected]>
AuthorDate: Wed Jul 27 09:00:42 2022 +0800

    HBASE-27185 Rewrite NettyRpcServer to decode rpc request with netty handler 
(#4624)
    
    Signed-off-by: Xin Sun <[email protected]>
---
 .../hadoop/hbase/ipc/NettyRpcConnection.java       |   3 -
 .../hbase/security/CryptoAESUnwrapHandler.java     |  47 ---
 .../hbase/security/CryptoAESWrapHandler.java       |  48 ----
 .../NettyHBaseRpcConnectionHeaderHandler.java      |   8 +-
 .../hbase/security/NettyHBaseSaslRpcClient.java    |   4 +-
 .../security/NettyHBaseSaslRpcClientHandler.java   |  14 +-
 .../hadoop/hbase/security/SaslUnwrapHandler.java   |  18 +-
 .../hadoop/hbase/security/SaslWrapHandler.java     |  14 +-
 .../hbase/ipc/NettyHBaseSaslRpcServerHandler.java  | 115 ++++++++
 .../hadoop/hbase/ipc/NettyRpcFrameDecoder.java     |  21 +-
 .../apache/hadoop/hbase/ipc/NettyRpcServer.java    |   9 +-
 .../hbase/ipc/NettyRpcServerPreambleHandler.java   |  27 +-
 .../hbase/ipc/NettyRpcServerRequestDecoder.java    |  52 +---
 .../hadoop/hbase/ipc/NettyServerRpcConnection.java |  54 ++--
 .../org/apache/hadoop/hbase/ipc/ServerCall.java    |  47 +--
 .../hadoop/hbase/ipc/ServerRpcConnection.java      | 317 ++++++++-------------
 .../hadoop/hbase/ipc/SimpleRpcServerResponder.java |  28 ++
 .../hbase/ipc/SimpleServerRpcConnection.java       | 115 ++++++++
 .../hadoop/hbase/security/HBaseSaslRpcServer.java  |  32 ++-
 19 files changed, 507 insertions(+), 466 deletions(-)

diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
index 14e8cbc13d3..d211b1b98e5 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
@@ -223,9 +223,6 @@ class NettyRpcConnection extends RpcConnection {
       public void operationComplete(Future<Boolean> future) throws Exception {
         if (future.isSuccess()) {
           ChannelPipeline p = ch.pipeline();
-          p.remove(SaslChallengeDecoder.class);
-          p.remove(NettyHBaseSaslRpcClientHandler.class);
-
           // check if negotiate with server for connection header is necessary
           if (saslHandler.isNeedProcessConnectionHeader()) {
             Promise<Boolean> connectionHeaderPromise = 
ch.eventLoop().newPromise();
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/CryptoAESUnwrapHandler.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/CryptoAESUnwrapHandler.java
deleted file mode 100644
index 31ed191f91a..00000000000
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/CryptoAESUnwrapHandler.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.security;
-
-import org.apache.hadoop.hbase.io.crypto.aes.CryptoAES;
-import org.apache.yetus.audience.InterfaceAudience;
-
-import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
-import org.apache.hbase.thirdparty.io.netty.buffer.Unpooled;
-import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
-import 
org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
-
-/**
- * Unwrap messages with Crypto AES. Should be placed after a
- * io.netty.handler.codec.LengthFieldBasedFrameDecoder
- */
[email protected]
-public class CryptoAESUnwrapHandler extends 
SimpleChannelInboundHandler<ByteBuf> {
-
-  private final CryptoAES cryptoAES;
-
-  public CryptoAESUnwrapHandler(CryptoAES cryptoAES) {
-    this.cryptoAES = cryptoAES;
-  }
-
-  @Override
-  protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws 
Exception {
-    byte[] bytes = new byte[msg.readableBytes()];
-    msg.readBytes(bytes);
-    ctx.fireChannelRead(Unpooled.wrappedBuffer(cryptoAES.unwrap(bytes, 0, 
bytes.length)));
-  }
-}
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/CryptoAESWrapHandler.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/CryptoAESWrapHandler.java
deleted file mode 100644
index a99d097ff2d..00000000000
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/CryptoAESWrapHandler.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.security;
-
-import org.apache.hadoop.hbase.io.crypto.aes.CryptoAES;
-import org.apache.yetus.audience.InterfaceAudience;
-
-import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
-import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
-import org.apache.hbase.thirdparty.io.netty.handler.codec.MessageToByteEncoder;
-
-/**
- * wrap messages with Crypto AES.
- */
[email protected]
-public class CryptoAESWrapHandler extends MessageToByteEncoder<ByteBuf> {
-
-  private final CryptoAES cryptoAES;
-
-  public CryptoAESWrapHandler(CryptoAES cryptoAES) {
-    this.cryptoAES = cryptoAES;
-  }
-
-  @Override
-  protected void encode(ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) 
throws Exception {
-    byte[] bytes = new byte[msg.readableBytes()];
-    msg.readBytes(bytes);
-    byte[] wrapperBytes = cryptoAES.wrap(bytes, 0, bytes.length);
-    out.ensureWritable(4 + wrapperBytes.length);
-    out.writeInt(wrapperBytes.length);
-    out.writeBytes(wrapperBytes);
-  }
-}
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseRpcConnectionHeaderHandler.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseRpcConnectionHeaderHandler.java
index a75091c5293..20197912dcb 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseRpcConnectionHeaderHandler.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseRpcConnectionHeaderHandler.java
@@ -25,7 +25,6 @@ import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
 import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
 import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
 import 
org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
-import 
org.apache.hbase.thirdparty.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise;
 
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
@@ -92,10 +91,7 @@ public class NettyHBaseRpcConnectionHeaderHandler extends 
SimpleChannelInboundHa
    * Remove handlers for sasl encryption and add handlers for Crypto AES 
encryption
    */
   private void setupCryptoAESHandler(ChannelPipeline p, CryptoAES cryptoAES) {
-    p.remove(SaslWrapHandler.class);
-    p.remove(SaslUnwrapHandler.class);
-    String lengthDecoder = 
p.context(LengthFieldBasedFrameDecoder.class).name();
-    p.addAfter(lengthDecoder, null, new CryptoAESUnwrapHandler(cryptoAES));
-    p.addAfter(lengthDecoder, null, new CryptoAESWrapHandler(cryptoAES));
+    p.replace(SaslWrapHandler.class, null, new 
SaslWrapHandler(cryptoAES::wrap));
+    p.replace(SaslUnwrapHandler.class, null, new 
SaslUnwrapHandler(cryptoAES::unwrap));
   }
 }
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClient.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClient.java
index 9b16a41afe4..ede12258ad1 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClient.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClient.java
@@ -52,9 +52,9 @@ public class NettyHBaseSaslRpcClient extends 
AbstractHBaseSaslRpcClient {
       return;
     }
     // add wrap and unwrap handlers to pipeline.
-    p.addFirst(new SaslWrapHandler(saslClient),
+    p.addFirst(new SaslWrapHandler(saslClient::wrap),
       new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4),
-      new SaslUnwrapHandler(saslClient));
+      new SaslUnwrapHandler(saslClient::unwrap));
   }
 
   public String getSaslQOP() {
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClientHandler.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClientHandler.java
index 7473c3269b0..d4d5cb39746 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClientHandler.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClientHandler.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.exceptions.ConnectionClosedException;
 import org.apache.hadoop.hbase.ipc.FallbackDisallowedException;
 import 
org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProvider;
+import org.apache.hadoop.hbase.util.NettyFutureUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
@@ -33,6 +34,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
 import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
 import 
org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
 import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise;
 
@@ -77,7 +79,7 @@ public class NettyHBaseSaslRpcClientHandler extends 
SimpleChannelInboundHandler<
 
   private void writeResponse(ChannelHandlerContext ctx, byte[] response) {
     LOG.trace("Sending token size={} from initSASLContext.", response.length);
-    ctx.writeAndFlush(
+    NettyFutureUtils.safeWriteAndFlush(ctx,
       ctx.alloc().buffer(4 + 
response.length).writeInt(response.length).writeBytes(response));
   }
 
@@ -90,8 +92,11 @@ public class NettyHBaseSaslRpcClientHandler extends 
SimpleChannelInboundHandler<
     if (LOG.isTraceEnabled()) {
       LOG.trace("SASL negotiation for {} is complete", 
provider.getSaslAuthMethod().getName());
     }
-
+    ChannelPipeline p = ctx.pipeline();
     saslRpcClient.setupSaslHandler(ctx.pipeline());
+    p.remove(SaslChallengeDecoder.class);
+    p.remove(this);
+
     setCryptoAESOption();
 
     saslPromise.setSuccess(true);
@@ -110,6 +115,9 @@ public class NettyHBaseSaslRpcClientHandler extends 
SimpleChannelInboundHandler<
 
   @Override
   public void handlerAdded(ChannelHandlerContext ctx) {
+    // dispose the saslRpcClient when the channel is closed, since 
saslRpcClient is final, it is
+    // safe to reference it in lambda expr.
+    NettyFutureUtils.addListener(ctx.channel().closeFuture(), f -> 
saslRpcClient.dispose());
     try {
       byte[] initialResponse = ugi.doAs(new 
PrivilegedExceptionAction<byte[]>() {
 
@@ -170,14 +178,12 @@ public class NettyHBaseSaslRpcClientHandler extends 
SimpleChannelInboundHandler<
 
   @Override
   public void channelInactive(ChannelHandlerContext ctx) throws Exception {
-    saslRpcClient.dispose();
     saslPromise.tryFailure(new ConnectionClosedException("Connection closed"));
     ctx.fireChannelInactive();
   }
 
   @Override
   public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
-    saslRpcClient.dispose();
     saslPromise.tryFailure(cause);
   }
 }
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUnwrapHandler.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUnwrapHandler.java
index dfc36e4ba31..87e518dae4a 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUnwrapHandler.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUnwrapHandler.java
@@ -17,7 +17,7 @@
  */
 package org.apache.hadoop.hbase.security;
 
-import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
 import org.apache.yetus.audience.InterfaceAudience;
 
 import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
@@ -32,22 +32,20 @@ import 
org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
 @InterfaceAudience.Private
 public class SaslUnwrapHandler extends SimpleChannelInboundHandler<ByteBuf> {
 
-  private final SaslClient saslClient;
-
-  public SaslUnwrapHandler(SaslClient saslClient) {
-    this.saslClient = saslClient;
+  public interface Unwrapper {
+    byte[] unwrap(byte[] incoming, int offset, int len) throws SaslException;
   }
 
-  @Override
-  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
-    SaslUtil.safeDispose(saslClient);
-    ctx.fireChannelInactive();
+  private final Unwrapper unwrapper;
+
+  public SaslUnwrapHandler(Unwrapper unwrapper) {
+    this.unwrapper = unwrapper;
   }
 
   @Override
   protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws 
Exception {
     byte[] bytes = new byte[msg.readableBytes()];
     msg.readBytes(bytes);
-    ctx.fireChannelRead(Unpooled.wrappedBuffer(saslClient.unwrap(bytes, 0, 
bytes.length)));
+    ctx.fireChannelRead(Unpooled.wrappedBuffer(unwrapper.unwrap(bytes, 0, 
bytes.length)));
   }
 }
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslWrapHandler.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslWrapHandler.java
index 21f70e3f1e4..6caf2a3e8f5 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslWrapHandler.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslWrapHandler.java
@@ -17,7 +17,7 @@
  */
 package org.apache.hadoop.hbase.security;
 
-import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
 import org.apache.yetus.audience.InterfaceAudience;
 
 import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
@@ -30,17 +30,21 @@ import 
org.apache.hbase.thirdparty.io.netty.handler.codec.MessageToByteEncoder;
 @InterfaceAudience.Private
 public class SaslWrapHandler extends MessageToByteEncoder<ByteBuf> {
 
-  private final SaslClient saslClient;
+  public interface Wrapper {
+    byte[] wrap(byte[] outgoing, int offset, int len) throws SaslException;
+  }
+
+  private final Wrapper wrapper;
 
-  public SaslWrapHandler(SaslClient saslClient) {
-    this.saslClient = saslClient;
+  public SaslWrapHandler(Wrapper wrapper) {
+    this.wrapper = wrapper;
   }
 
   @Override
   protected void encode(ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) 
throws Exception {
     byte[] bytes = new byte[msg.readableBytes()];
     msg.readBytes(bytes);
-    byte[] wrapperBytes = saslClient.wrap(bytes, 0, bytes.length);
+    byte[] wrapperBytes = wrapper.wrap(bytes, 0, bytes.length);
     out.ensureWritable(4 + wrapperBytes.length);
     out.writeInt(wrapperBytes.length);
     out.writeBytes(wrapperBytes);
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyHBaseSaslRpcServerHandler.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyHBaseSaslRpcServerHandler.java
new file mode 100644
index 00000000000..e36e0b44c74
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyHBaseSaslRpcServerHandler.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.io.IOException;
+import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
+import org.apache.hadoop.hbase.security.SaslStatus;
+import org.apache.hadoop.hbase.security.SaslUnwrapHandler;
+import org.apache.hadoop.hbase.security.SaslWrapHandler;
+import org.apache.hadoop.hbase.util.NettyFutureUtils;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
+import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
+import 
org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
+import 
org.apache.hbase.thirdparty.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+
+/**
+ * Implement SASL negotiation logic for rpc server.
+ */
+class NettyHBaseSaslRpcServerHandler extends 
SimpleChannelInboundHandler<ByteBuf> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(NettyHBaseSaslRpcServerHandler.class);
+
+  static final String DECODER_NAME = "SaslNegotiationDecoder";
+
+  private final NettyRpcServer rpcServer;
+
+  private final NettyServerRpcConnection conn;
+
+  NettyHBaseSaslRpcServerHandler(NettyRpcServer rpcServer, 
NettyServerRpcConnection conn) {
+    this.rpcServer = rpcServer;
+    this.conn = conn;
+  }
+
+  private void doResponse(ChannelHandlerContext ctx, SaslStatus status, 
Writable rv,
+    String errorClass, String error) throws IOException {
+    // In my testing, have noticed that sasl messages are usually
+    // in the ballpark of 100-200. That's why the initial capacity is 256.
+    ByteBuf resp = ctx.alloc().buffer(256);
+    try (ByteBufOutputStream out = new ByteBufOutputStream(resp)) {
+      out.writeInt(status.state); // write status
+      if (status == SaslStatus.SUCCESS) {
+        rv.write(out);
+      } else {
+        WritableUtils.writeString(out, errorClass);
+        WritableUtils.writeString(out, error);
+      }
+    }
+    NettyFutureUtils.safeWriteAndFlush(ctx, resp);
+  }
+
+  @Override
+  protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws 
Exception {
+    LOG.debug("Read input token of size={} for processing by 
saslServer.evaluateResponse()",
+      msg.readableBytes());
+    HBaseSaslRpcServer saslServer = conn.getOrCreateSaslServer();
+    byte[] saslToken = new byte[msg.readableBytes()];
+    msg.readBytes(saslToken, 0, saslToken.length);
+    byte[] replyToken = saslServer.evaluateResponse(saslToken);
+    if (replyToken != null) {
+      LOG.debug("Will send token of size {} from saslServer.", 
replyToken.length);
+      doResponse(ctx, SaslStatus.SUCCESS, new BytesWritable(replyToken), null, 
null);
+    }
+    if (saslServer.isComplete()) {
+      conn.finishSaslNegotiation();
+      String qop = saslServer.getNegotiatedQop();
+      boolean useWrap = qop != null && !"auth".equalsIgnoreCase(qop);
+      ChannelPipeline p = ctx.pipeline();
+      if (useWrap) {
+        p.addFirst(new SaslWrapHandler(saslServer::wrap));
+        p.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 
4),
+          new SaslUnwrapHandler(saslServer::unwrap));
+      }
+      conn.setupDecoder();
+      p.remove(this);
+      p.remove(DECODER_NAME);
+    }
+  }
+
+  @Override
+  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 
throws Exception {
+    LOG.error("Error when doing SASL handshade, provider={}", conn.provider, 
cause);
+    Throwable sendToClient = HBaseSaslRpcServer.unwrap(cause);
+    doResponse(ctx, SaslStatus.ERROR, null, sendToClient.getClass().getName(),
+      sendToClient.getLocalizedMessage());
+    rpcServer.metrics.authenticationFailure();
+    String clientIP = this.toString();
+    // attempting user could be null
+    RpcServer.AUDITLOG.warn("{}{}: {}", RpcServer.AUTH_FAILED_FOR, clientIP,
+      conn.saslServer != null ? conn.saslServer.getAttemptingUser() : 
"Unknown");
+    NettyFutureUtils.safeClose(ctx);
+  }
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcFrameDecoder.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcFrameDecoder.java
index 164934ac247..551d1d3fb40 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcFrameDecoder.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcFrameDecoder.java
@@ -38,21 +38,18 @@ import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
  * @since 2.0.0
  */
 @InterfaceAudience.Private
-public class NettyRpcFrameDecoder extends ByteToMessageDecoder {
+class NettyRpcFrameDecoder extends ByteToMessageDecoder {
 
   private static int FRAME_LENGTH_FIELD_LENGTH = 4;
 
   private final int maxFrameLength;
+  final NettyServerRpcConnection connection;
+
   private boolean requestTooBig;
   private String requestTooBigMessage;
 
-  public NettyRpcFrameDecoder(int maxFrameLength) {
+  public NettyRpcFrameDecoder(int maxFrameLength, NettyServerRpcConnection 
connection) {
     this.maxFrameLength = maxFrameLength;
-  }
-
-  NettyServerRpcConnection connection;
-
-  void setConnection(NettyServerRpcConnection connection) {
     this.connection = connection;
   }
 
@@ -75,10 +72,10 @@ public class NettyRpcFrameDecoder extends 
ByteToMessageDecoder {
 
     if (frameLength > maxFrameLength) {
       requestTooBig = true;
-      requestTooBigMessage = "RPC data length of " + frameLength + " received 
from "
-        + connection.getHostAddress() + " is greater than max allowed "
-        + connection.rpcServer.maxRequestSize + ". Set \"" + 
SimpleRpcServer.MAX_REQUEST_SIZE
-        + "\" on server to override this limit (not recommended)";
+      requestTooBigMessage =
+        "RPC data length of " + frameLength + " received from " + 
connection.getHostAddress()
+          + " is greater than max allowed " + 
connection.rpcServer.maxRequestSize + ". Set \""
+          + RpcServer.MAX_REQUEST_SIZE + "\" on server to override this limit 
(not recommended)";
 
       NettyRpcServer.LOG.warn(requestTooBigMessage);
 
@@ -132,7 +129,7 @@ public class NettyRpcFrameDecoder extends 
ByteToMessageDecoder {
     // Make sure the client recognizes the underlying exception
     // Otherwise, throw a DoNotRetryIOException.
     if (
-      
VersionInfoUtil.hasMinimumVersion(connection.connectionHeader.getVersionInfo(),
+      VersionInfoUtil.hasMinimumVersion(connection.getVersionInfo(),
         RequestTooBigException.MAJOR_VERSION, 
RequestTooBigException.MINOR_VERSION)
     ) {
       reqTooBig.setResponse(null, null, reqTooBigEx, requestTooBigMessage);
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java
index 9032e77bf42..8f12b245030 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java
@@ -106,11 +106,9 @@ public class NettyRpcServer extends RpcServer {
           ChannelPipeline pipeline = ch.pipeline();
           FixedLengthFrameDecoder preambleDecoder = new 
FixedLengthFrameDecoder(6);
           preambleDecoder.setSingleDecode(true);
-          pipeline.addLast("preambleDecoder", preambleDecoder);
-          pipeline.addLast("preambleHandler", 
createNettyRpcServerPreambleHandler());
-          pipeline.addLast("frameDecoder", new 
NettyRpcFrameDecoder(maxRequestSize));
-          pipeline.addLast("decoder", new 
NettyRpcServerRequestDecoder(allChannels, metrics));
-          pipeline.addLast("encoder", new 
NettyRpcServerResponseEncoder(metrics));
+          pipeline.addLast(NettyRpcServerPreambleHandler.DECODER_NAME, 
preambleDecoder);
+          pipeline.addLast(createNettyRpcServerPreambleHandler(),
+            new NettyRpcServerResponseEncoder(metrics));
         }
       });
     try {
@@ -153,6 +151,7 @@ public class NettyRpcServer extends RpcServer {
     }
   }
 
+  // will be overriden in tests
   @InterfaceAudience.Private
   protected NettyRpcServerPreambleHandler 
createNettyRpcServerPreambleHandler() {
     return new NettyRpcServerPreambleHandler(NettyRpcServer.this);
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerPreambleHandler.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerPreambleHandler.java
index cf2551e1c08..15a95bc9b09 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerPreambleHandler.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerPreambleHandler.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hbase.ipc;
 
 import java.nio.ByteBuffer;
+import org.apache.hadoop.hbase.util.NettyFutureUtils;
 import org.apache.yetus.audience.InterfaceAudience;
 
 import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
@@ -25,6 +26,7 @@ import org.apache.hbase.thirdparty.io.netty.channel.Channel;
 import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
 import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
 import 
org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
+import 
org.apache.hbase.thirdparty.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 
 /**
  * Handle connection preamble.
@@ -33,6 +35,8 @@ import 
org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
 @InterfaceAudience.Private
 class NettyRpcServerPreambleHandler extends 
SimpleChannelInboundHandler<ByteBuf> {
 
+  static final String DECODER_NAME = "preambleDecoder";
+
   private final NettyRpcServer rpcServer;
 
   public NettyRpcServerPreambleHandler(NettyRpcServer rpcServer) {
@@ -50,12 +54,29 @@ class NettyRpcServerPreambleHandler extends 
SimpleChannelInboundHandler<ByteBuf>
       return;
     }
     ChannelPipeline p = ctx.pipeline();
-    ((NettyRpcFrameDecoder) p.get("frameDecoder")).setConnection(conn);
-    ((NettyRpcServerRequestDecoder) p.get("decoder")).setConnection(conn);
+    if (conn.useSasl) {
+      LengthFieldBasedFrameDecoder decoder =
+        new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4);
+      decoder.setSingleDecode(true);
+      p.addLast(NettyHBaseSaslRpcServerHandler.DECODER_NAME, decoder);
+      p.addLast(new NettyHBaseSaslRpcServerHandler(rpcServer, conn));
+    } else {
+      conn.setupDecoder();
+    }
+    // add first and then remove, so the single decode decoder will pass the 
remaining bytes to the
+    // handler above.
     p.remove(this);
-    p.remove("preambleDecoder");
+    p.remove(DECODER_NAME);
+  }
+
+  @Override
+  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 
throws Exception {
+    NettyRpcServer.LOG.warn("Connection {}; caught unexpected downstream 
exception.",
+      ctx.channel().remoteAddress(), cause);
+    NettyFutureUtils.safeClose(ctx);
   }
 
+  // will be overridden in tests
   protected NettyServerRpcConnection createNettyServerRpcConnection(Channel 
channel) {
     return new NettyServerRpcConnection(rpcServer, channel);
   }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerRequestDecoder.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerRequestDecoder.java
index cc8b07702b4..2e489e9ab05 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerRequestDecoder.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerRequestDecoder.java
@@ -17,64 +17,42 @@
  */
 package org.apache.hadoop.hbase.ipc;
 
+import org.apache.hadoop.hbase.util.NettyFutureUtils;
 import org.apache.yetus.audience.InterfaceAudience;
 
 import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
 import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
-import 
org.apache.hbase.thirdparty.io.netty.channel.ChannelInboundHandlerAdapter;
-import org.apache.hbase.thirdparty.io.netty.channel.group.ChannelGroup;
+import 
org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
 
 /**
  * Decoder for rpc request.
  * @since 2.0.0
  */
 @InterfaceAudience.Private
-class NettyRpcServerRequestDecoder extends ChannelInboundHandlerAdapter {
-
-  private final ChannelGroup allChannels;
+class NettyRpcServerRequestDecoder extends 
SimpleChannelInboundHandler<ByteBuf> {
 
   private final MetricsHBaseServer metrics;
 
-  public NettyRpcServerRequestDecoder(ChannelGroup allChannels, 
MetricsHBaseServer metrics) {
-    this.allChannels = allChannels;
-    this.metrics = metrics;
-  }
-
-  private NettyServerRpcConnection connection;
+  private final NettyServerRpcConnection connection;
 
-  void setConnection(NettyServerRpcConnection connection) {
+  public NettyRpcServerRequestDecoder(MetricsHBaseServer metrics,
+    NettyServerRpcConnection connection) {
+    super(false);
+    this.metrics = metrics;
     this.connection = connection;
   }
 
   @Override
-  public void channelActive(ChannelHandlerContext ctx) throws Exception {
-    allChannels.add(ctx.channel());
-    NettyRpcServer.LOG.trace("Connection {}; # active connections={}",
-      ctx.channel().remoteAddress(), (allChannels.size() - 1));
-    super.channelActive(ctx);
+  public void exceptionCaught(ChannelHandlerContext ctx, Throwable e) {
+    NettyRpcServer.LOG.warn("Connection {}; caught unexpected downstream 
exception.",
+      ctx.channel().remoteAddress(), e);
+    NettyFutureUtils.safeClose(ctx);
   }
 
   @Override
-  public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
Exception {
-    ByteBuf input = (ByteBuf) msg;
+  protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws 
Exception {
     // 4 bytes length field
-    metrics.receivedBytes(input.readableBytes() + 4);
-    connection.process(input);
-  }
-
-  @Override
-  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
-    allChannels.remove(ctx.channel());
-    NettyRpcServer.LOG.trace("Disconnection {}; # active connections={}",
-      ctx.channel().remoteAddress(), (allChannels.size() - 1));
-    super.channelInactive(ctx);
-  }
-
-  @Override
-  public void exceptionCaught(ChannelHandlerContext ctx, Throwable e) {
-    allChannels.remove(ctx.channel());
-    NettyRpcServer.LOG.trace("Connection {}; caught unexpected downstream 
exception.",
-      ctx.channel().remoteAddress(), e);
-    ctx.channel().close();
+    metrics.receivedBytes(msg.readableBytes() + 4);
+    connection.process(msg);
   }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerRpcConnection.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerRpcConnection.java
index 58be1376953..60db16d77e0 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerRpcConnection.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerRpcConnection.java
@@ -20,12 +20,12 @@ package org.apache.hadoop.hbase.ipc;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
 import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.nio.SingleByteBuff;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.NettyFutureUtils;
 import org.apache.yetus.audience.InterfaceAudience;
 
 import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
@@ -33,7 +33,7 @@ import 
org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescrip
 import org.apache.hbase.thirdparty.com.google.protobuf.Message;
 import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
 import org.apache.hbase.thirdparty.io.netty.channel.Channel;
-import org.apache.hbase.thirdparty.io.netty.util.ReferenceCountUtil;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
 
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
 
@@ -49,10 +49,16 @@ class NettyServerRpcConnection extends ServerRpcConnection {
   NettyServerRpcConnection(NettyRpcServer rpcServer, Channel channel) {
     super(rpcServer);
     this.channel = channel;
+    rpcServer.allChannels.add(channel);
+    NettyRpcServer.LOG.trace("Connection {}; # active connections={}", 
channel.remoteAddress(),
+      rpcServer.allChannels.size() - 1);
     // register close hook to release resources
-    channel.closeFuture().addListener(f -> {
+    NettyFutureUtils.addListener(channel.closeFuture(), f -> {
       disposeSasl();
       callCleanupIfNeeded();
+      NettyRpcServer.LOG.trace("Disconnection {}; # active connections={}", 
channel.remoteAddress(),
+        rpcServer.allChannels.size() - 1);
+      rpcServer.allChannels.remove(channel);
     });
     InetSocketAddress inetSocketAddress = ((InetSocketAddress) 
channel.remoteAddress());
     this.addr = inetSocketAddress.getAddress();
@@ -64,38 +70,22 @@ class NettyServerRpcConnection extends ServerRpcConnection {
     this.remotePort = inetSocketAddress.getPort();
   }
 
-  void process(final ByteBuf buf) throws IOException, InterruptedException {
-    if (connectionHeaderRead) {
-      this.callCleanup = () -> ReferenceCountUtil.safeRelease(buf);
-      process(new SingleByteBuff(buf.nioBuffer()));
-    } else {
-      ByteBuffer connectionHeader = ByteBuffer.allocate(buf.readableBytes());
-      try {
-        buf.readBytes(connectionHeader);
-      } finally {
-        buf.release();
-      }
-      process(connectionHeader);
-    }
-  }
-
-  void process(ByteBuffer buf) throws IOException, InterruptedException {
-    process(new SingleByteBuff(buf));
+  void setupDecoder() {
+    ChannelPipeline p = channel.pipeline();
+    p.addLast("frameDecoder", new 
NettyRpcFrameDecoder(rpcServer.maxRequestSize, this));
+    p.addLast("decoder", new NettyRpcServerRequestDecoder(rpcServer.metrics, 
this));
   }
 
-  void process(ByteBuff buf) throws IOException, InterruptedException {
+  void process(ByteBuf buf) throws IOException, InterruptedException {
+    if (skipInitialSaslHandshake) {
+      skipInitialSaslHandshake = false;
+      buf.release();
+      return;
+    }
+    this.callCleanup = () -> buf.release();
+    ByteBuff byteBuff = new SingleByteBuff(buf.nioBuffer());
     try {
-      if (skipInitialSaslHandshake) {
-        skipInitialSaslHandshake = false;
-        callCleanupIfNeeded();
-        return;
-      }
-
-      if (useSasl) {
-        saslReadAndProcess(buf);
-      } else {
-        processOneRpc(buf);
-      }
+      processOneRpc(byteBuff);
     } catch (Exception e) {
       callCleanupIfNeeded();
       throw e;
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java
index 44a7a74006a..bdd3593cf2d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java
@@ -399,37 +399,6 @@ public abstract class ServerCall<T extends 
ServerRpcConnection> implements RpcCa
     return pbBuf;
   }
 
-  protected BufferChain wrapWithSasl(BufferChain bc) throws IOException {
-    if (!this.connection.useSasl) {
-      return bc;
-    }
-    // Looks like no way around this; saslserver wants a byte array. I have to 
make it one.
-    // THIS IS A BIG UGLY COPY.
-    byte[] responseBytes = bc.getBytes();
-    byte[] token;
-    // synchronization may be needed since there can be multiple Handler
-    // threads using saslServer or Crypto AES to wrap responses.
-    if (connection.useCryptoAesWrap) {
-      // wrap with Crypto AES
-      synchronized (connection.cryptoAES) {
-        token = connection.cryptoAES.wrap(responseBytes, 0, 
responseBytes.length);
-      }
-    } else {
-      synchronized (connection.saslServer) {
-        token = connection.saslServer.wrap(responseBytes, 0, 
responseBytes.length);
-      }
-    }
-    if (RpcServer.LOG.isTraceEnabled()) {
-      RpcServer.LOG
-        .trace("Adding saslServer wrapped token of size " + token.length + " 
as call response.");
-    }
-
-    ByteBuffer[] responseBufs = new ByteBuffer[2];
-    responseBufs[0] = ByteBuffer.wrap(Bytes.toBytes(token.length));
-    responseBufs[1] = ByteBuffer.wrap(token);
-    return new BufferChain(responseBufs);
-  }
-
   @Override
   public long disconnectSince() {
     if (!this.connection.isConnectionOpen()) {
@@ -556,20 +525,6 @@ public abstract class ServerCall<T extends 
ServerRpcConnection> implements RpcCa
 
   @Override
   public synchronized BufferChain getResponse() {
-    if (connection.useWrap) {
-      /*
-       * wrapping result with SASL as the last step just before sending it 
out, so every message
-       * must have the right increasing sequence number
-       */
-      try {
-        return wrapWithSasl(response);
-      } catch (IOException e) {
-        /* it is exactly the same what setResponse() does */
-        RpcServer.LOG.warn("Exception while creating response " + e);
-        return null;
-      }
-    } else {
-      return response;
-    }
+    return response;
   }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java
index e5f01adff8c..efb6630ad9e 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java
@@ -24,15 +24,12 @@ import io.opentelemetry.api.trace.Span;
 import io.opentelemetry.context.Context;
 import io.opentelemetry.context.Scope;
 import io.opentelemetry.context.propagation.TextMapGetter;
-import java.io.ByteArrayInputStream;
 import java.io.Closeable;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
-import java.nio.channels.ReadableByteChannel;
 import java.security.GeneralSecurityException;
 import java.util.Objects;
 import java.util.Properties;
@@ -47,7 +44,6 @@ import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
 import org.apache.hadoop.hbase.io.crypto.aes.CryptoAES;
 import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
 import org.apache.hadoop.hbase.nio.ByteBuff;
-import org.apache.hadoop.hbase.nio.SingleByteBuff;
 import org.apache.hadoop.hbase.security.AccessDeniedException;
 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
 import org.apache.hadoop.hbase.security.SaslStatus;
@@ -58,7 +54,7 @@ import 
org.apache.hadoop.hbase.security.provider.SaslServerAuthenticationProvide
 import 
org.apache.hadoop.hbase.security.provider.SimpleSaslServerAuthenticationProvider;
 import org.apache.hadoop.hbase.trace.TraceUtil;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
@@ -67,7 +63,6 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
 import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.apache.hadoop.security.authorize.ProxyUsers;
-import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.yetus.audience.InterfaceAudience;
 
 import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
@@ -120,16 +115,9 @@ abstract class ServerRpcConnection implements Closeable {
   protected BlockingService service;
 
   protected SaslServerAuthenticationProvider provider;
-  protected boolean saslContextEstablished;
   protected boolean skipInitialSaslHandshake;
-  private ByteBuffer unwrappedData;
-  // When is this set? FindBugs wants to know! Says NP
-  private ByteBuffer unwrappedDataLengthBuffer = ByteBuffer.allocate(4);
   protected boolean useSasl;
   protected HBaseSaslRpcServer saslServer;
-  protected CryptoAES cryptoAES;
-  protected boolean useWrap = false;
-  protected boolean useCryptoAesWrap = false;
 
   // was authentication allowed with a fallback to simple auth
   protected boolean authenticatedWithFallback;
@@ -164,7 +152,7 @@ abstract class ServerRpcConnection implements Closeable {
   }
 
   public VersionInfo getVersionInfo() {
-    if (connectionHeader.hasVersionInfo()) {
+    if (connectionHeader != null && connectionHeader.hasVersionInfo()) {
       return connectionHeader.getVersionInfo();
     }
     return null;
@@ -181,18 +169,24 @@ abstract class ServerRpcConnection implements Closeable {
   /**
    * Set up cell block codecs n
    */
-  private void setupCellBlockCodecs(final ConnectionHeader header) throws 
FatalConnectionException {
+  private void setupCellBlockCodecs() throws FatalConnectionException {
     // TODO: Plug in other supported decoders.
-    if (!header.hasCellBlockCodecClass()) return;
-    String className = header.getCellBlockCodecClass();
-    if (className == null || className.length() == 0) return;
+    if (!connectionHeader.hasCellBlockCodecClass()) {
+      return;
+    }
+    String className = connectionHeader.getCellBlockCodecClass();
+    if (className == null || className.length() == 0) {
+      return;
+    }
     try {
       this.codec = (Codec) 
Class.forName(className).getDeclaredConstructor().newInstance();
     } catch (Exception e) {
       throw new UnsupportedCellCodecException(className, e);
     }
-    if (!header.hasCellBlockCompressorClass()) return;
-    className = header.getCellBlockCompressorClass();
+    if (!connectionHeader.hasCellBlockCompressorClass()) {
+      return;
+    }
+    className = connectionHeader.getCellBlockCompressorClass();
     try {
       this.compressionCodec =
         (CompressionCodec) 
Class.forName(className).getDeclaredConstructor().newInstance();
@@ -202,21 +196,29 @@ abstract class ServerRpcConnection implements Closeable {
   }
 
   /**
-   * Set up cipher for rpc encryption with Apache Commons Crypto n
+   * Set up cipher for rpc encryption with Apache Commons Crypto.
    */
-  private void setupCryptoCipher(final ConnectionHeader header,
-    RPCProtos.ConnectionHeaderResponse.Builder chrBuilder) throws 
FatalConnectionException {
+  private Pair<RPCProtos.ConnectionHeaderResponse, CryptoAES> 
setupCryptoCipher()
+    throws FatalConnectionException {
     // If simple auth, return
-    if (saslServer == null) return;
+    if (saslServer == null) {
+      return null;
+    }
     // check if rpc encryption with Crypto AES
     String qop = saslServer.getNegotiatedQop();
     boolean isEncryption = 
SaslUtil.QualityOfProtection.PRIVACY.getSaslQop().equalsIgnoreCase(qop);
     boolean isCryptoAesEncryption = isEncryption
       && 
this.rpcServer.conf.getBoolean("hbase.rpc.crypto.encryption.aes.enabled", 
false);
-    if (!isCryptoAesEncryption) return;
-    if (!header.hasRpcCryptoCipherTransformation()) return;
-    String transformation = header.getRpcCryptoCipherTransformation();
-    if (transformation == null || transformation.length() == 0) return;
+    if (!isCryptoAesEncryption) {
+      return null;
+    }
+    if (!connectionHeader.hasRpcCryptoCipherTransformation()) {
+      return null;
+    }
+    String transformation = 
connectionHeader.getRpcCryptoCipherTransformation();
+    if (transformation == null || transformation.length() == 0) {
+      return null;
+    }
     // Negotiates AES based on complete saslServer.
     // The Crypto metadata need to be encrypted and send to client.
     Properties properties = new Properties();
@@ -242,6 +244,7 @@ abstract class ServerRpcConnection implements Closeable {
     byte[] inIv = new byte[len];
     byte[] outIv = new byte[len];
 
+    CryptoAES cryptoAES;
     try {
       // generate the cipher meta data with SecureRandom
       CryptoRandom secureRandom = 
CryptoRandomFactory.getCryptoRandom(properties);
@@ -252,19 +255,20 @@ abstract class ServerRpcConnection implements Closeable {
 
       // create CryptoAES for server
       cryptoAES = new CryptoAES(transformation, properties, inKey, outKey, 
inIv, outIv);
-      // create SaslCipherMeta and send to client,
-      // for client, the [inKey, outKey], [inIv, outIv] should be reversed
-      RPCProtos.CryptoCipherMeta.Builder ccmBuilder = 
RPCProtos.CryptoCipherMeta.newBuilder();
-      ccmBuilder.setTransformation(transformation);
-      ccmBuilder.setInIv(getByteString(outIv));
-      ccmBuilder.setInKey(getByteString(outKey));
-      ccmBuilder.setOutIv(getByteString(inIv));
-      ccmBuilder.setOutKey(getByteString(inKey));
-      chrBuilder.setCryptoCipherMeta(ccmBuilder);
-      useCryptoAesWrap = true;
     } catch (GeneralSecurityException | IOException ex) {
       throw new UnsupportedCryptoException(ex.getMessage(), ex);
     }
+    // create SaslCipherMeta and send to client,
+    // for client, the [inKey, outKey], [inIv, outIv] should be reversed
+    RPCProtos.CryptoCipherMeta.Builder ccmBuilder = 
RPCProtos.CryptoCipherMeta.newBuilder();
+    ccmBuilder.setTransformation(transformation);
+    ccmBuilder.setInIv(getByteString(outIv));
+    ccmBuilder.setInKey(getByteString(outKey));
+    ccmBuilder.setOutIv(getByteString(inIv));
+    ccmBuilder.setOutKey(getByteString(inKey));
+    RPCProtos.ConnectionHeaderResponse resp =
+      
RPCProtos.ConnectionHeaderResponse.newBuilder().setCryptoCipherMeta(ccmBuilder).build();
+    return Pair.newPair(resp, cryptoAES);
   }
 
   private ByteString getByteString(byte[] bytes) {
@@ -327,125 +331,20 @@ abstract class ServerRpcConnection implements Closeable {
     doRespond(() -> bc);
   }
 
-  public void saslReadAndProcess(ByteBuff saslToken) throws IOException, 
InterruptedException {
-    if (saslContextEstablished) {
-      RpcServer.LOG.trace("Read input token of size={} for processing by 
saslServer.unwrap()",
-        saslToken.limit());
-      if (!useWrap) {
-        processOneRpc(saslToken);
-      } else {
-        byte[] b = saslToken.hasArray() ? saslToken.array() : 
saslToken.toBytes();
-        byte[] plaintextData;
-        if (useCryptoAesWrap) {
-          // unwrap with CryptoAES
-          plaintextData = cryptoAES.unwrap(b, 0, b.length);
-        } else {
-          plaintextData = saslServer.unwrap(b, 0, b.length);
-        }
-        // release the request buffer as we have already unwrapped all its 
content
-        callCleanupIfNeeded();
-        processUnwrappedData(plaintextData);
-      }
-    } else {
-      byte[] replyToken;
-      try {
-        if (saslServer == null) {
-          try {
-            saslServer =
-              new HBaseSaslRpcServer(provider, rpcServer.saslProps, 
rpcServer.secretManager);
-          } catch (Exception e) {
-            RpcServer.LOG.error("Error when trying to create instance of 
HBaseSaslRpcServer "
-              + "with sasl provider: " + provider, e);
-            throw e;
-          }
-          RpcServer.LOG.debug("Created SASL server with mechanism={}",
-            provider.getSaslAuthMethod().getAuthMethod());
-        }
-        RpcServer.LOG.debug(
-          "Read input token of size={} for processing by saslServer." + 
"evaluateResponse()",
-          saslToken.limit());
-        replyToken = saslServer
-          .evaluateResponse(saslToken.hasArray() ? saslToken.array() : 
saslToken.toBytes());
-      } catch (IOException e) {
-        RpcServer.LOG.debug("Failed to execute SASL handshake", e);
-        IOException sendToClient = e;
-        Throwable cause = e;
-        while (cause != null) {
-          if (cause instanceof InvalidToken) {
-            sendToClient = (InvalidToken) cause;
-            break;
-          }
-          cause = cause.getCause();
-        }
-        doRawSaslReply(SaslStatus.ERROR, null, 
sendToClient.getClass().getName(),
-          sendToClient.getLocalizedMessage());
-        this.rpcServer.metrics.authenticationFailure();
-        String clientIP = this.toString();
-        // attempting user could be null
-        RpcServer.AUDITLOG.warn("{}{}: {}", RpcServer.AUTH_FAILED_FOR, 
clientIP,
-          saslServer.getAttemptingUser());
-        throw e;
-      } finally {
-        // release the request buffer as we have already unwrapped all its 
content
-        callCleanupIfNeeded();
-      }
-      if (replyToken != null) {
-        if (RpcServer.LOG.isDebugEnabled()) {
-          RpcServer.LOG.debug("Will send token of size " + replyToken.length + 
" from saslServer.");
-        }
-        doRawSaslReply(SaslStatus.SUCCESS, new BytesWritable(replyToken), 
null, null);
-      }
-      if (saslServer.isComplete()) {
-        String qop = saslServer.getNegotiatedQop();
-        useWrap = qop != null && !"auth".equalsIgnoreCase(qop);
-        ugi =
-          provider.getAuthorizedUgi(saslServer.getAuthorizationID(), 
this.rpcServer.secretManager);
-        RpcServer.LOG.debug(
-          "SASL server context established. Authenticated client: {}. 
Negotiated QoP is {}", ugi,
-          qop);
-        this.rpcServer.metrics.authenticationSuccess();
-        RpcServer.AUDITLOG.info(RpcServer.AUTH_SUCCESSFUL_FOR + ugi);
-        saslContextEstablished = true;
-      }
+  HBaseSaslRpcServer getOrCreateSaslServer() throws IOException {
+    if (saslServer == null) {
+      saslServer = new HBaseSaslRpcServer(provider, rpcServer.saslProps, 
rpcServer.secretManager);
     }
+    return saslServer;
   }
 
-  private void processUnwrappedData(byte[] inBuf) throws IOException, 
InterruptedException {
-    ReadableByteChannel ch = Channels.newChannel(new 
ByteArrayInputStream(inBuf));
-    // Read all RPCs contained in the inBuf, even partial ones
-    while (true) {
-      int count;
-      if (unwrappedDataLengthBuffer.remaining() > 0) {
-        count = this.rpcServer.channelRead(ch, unwrappedDataLengthBuffer);
-        if (count <= 0 || unwrappedDataLengthBuffer.remaining() > 0) {
-          return;
-        }
-      }
-
-      if (unwrappedData == null) {
-        unwrappedDataLengthBuffer.flip();
-        int unwrappedDataLength = unwrappedDataLengthBuffer.getInt();
-
-        if (unwrappedDataLength == RpcClient.PING_CALL_ID) {
-          if (RpcServer.LOG.isDebugEnabled()) RpcServer.LOG.debug("Received 
ping message");
-          unwrappedDataLengthBuffer.clear();
-          continue; // ping message
-        }
-        unwrappedData = ByteBuffer.allocate(unwrappedDataLength);
-      }
-
-      count = this.rpcServer.channelRead(ch, unwrappedData);
-      if (count <= 0 || unwrappedData.remaining() > 0) {
-        return;
-      }
-
-      if (unwrappedData.remaining() == 0) {
-        unwrappedDataLengthBuffer.clear();
-        unwrappedData.flip();
-        processOneRpc(new SingleByteBuff(unwrappedData));
-        unwrappedData = null;
-      }
-    }
+  void finishSaslNegotiation() throws IOException {
+    String qop = saslServer.getNegotiatedQop();
+    ugi = provider.getAuthorizedUgi(saslServer.getAuthorizationID(), 
this.rpcServer.secretManager);
+    RpcServer.LOG.debug(
+      "SASL server context established. Authenticated client: {}. Negotiated 
QoP is {}", ugi, qop);
+    rpcServer.metrics.authenticationSuccess();
+    RpcServer.AUDITLOG.info(RpcServer.AUTH_SUCCESSFUL_FOR + ugi);
   }
 
   public void processOneRpc(ByteBuff buf) throws IOException, 
InterruptedException {
@@ -453,6 +352,7 @@ abstract class ServerRpcConnection implements Closeable {
       processRequest(buf);
     } else {
       processConnectionHeader(buf);
+      callCleanupIfNeeded();
       this.connectionHeaderRead = true;
       if (rpcServer.needAuthorization() && !authorizeConnection()) {
         // Throw FatalConnectionException wrapping ACE so client does right 
thing and closes
@@ -486,25 +386,35 @@ abstract class ServerRpcConnection implements Closeable {
     return true;
   }
 
-  // Reads the connection header following version
-  private void processConnectionHeader(ByteBuff buf) throws IOException {
+  private CodedInputStream createCis(ByteBuff buf) {
+    // Here we read in the header. We avoid having pb
+    // do its default 4k allocation for CodedInputStream. We force it to use
+    // backing array.
+    CodedInputStream cis;
     if (buf.hasArray()) {
-      this.connectionHeader = ConnectionHeader.parseFrom(buf.array());
+      cis = UnsafeByteOperations
+        .unsafeWrap(buf.array(), buf.arrayOffset() + buf.position(), 
buf.limit()).newCodedInput();
     } else {
-      CodedInputStream cis = UnsafeByteOperations
-        .unsafeWrap(new ByteBuffByteInput(buf, 0, buf.limit()), 0, 
buf.limit()).newCodedInput();
-      cis.enableAliasing(true);
-      this.connectionHeader = ConnectionHeader.parseFrom(cis);
+      cis = UnsafeByteOperations.unsafeWrap(new ByteBuffByteInput(buf, 
buf.limit()), 0, buf.limit())
+        .newCodedInput();
     }
+    cis.enableAliasing(true);
+    return cis;
+  }
+
+  // Reads the connection header following version
+  private void processConnectionHeader(ByteBuff buf) throws IOException {
+    this.connectionHeader = ConnectionHeader.parseFrom(createCis(buf));
     String serviceName = connectionHeader.getServiceName();
-    if (serviceName == null) throw new EmptyServiceNameException();
+    if (serviceName == null) {
+      throw new EmptyServiceNameException();
+    }
     this.service = RpcServer.getService(this.rpcServer.services, serviceName);
-    if (this.service == null) throw new UnknownServiceException(serviceName);
-    setupCellBlockCodecs(this.connectionHeader);
-    RPCProtos.ConnectionHeaderResponse.Builder chrBuilder =
-      RPCProtos.ConnectionHeaderResponse.newBuilder();
-    setupCryptoCipher(this.connectionHeader, chrBuilder);
-    responseConnectionHeader(chrBuilder);
+    if (this.service == null) {
+      throw new UnknownServiceException(serviceName);
+    }
+    setupCellBlockCodecs();
+    sendConnectionHeaderResponseIfNeeded();
     UserGroupInformation protocolUser = createUser(connectionHeader);
     if (!useSasl) {
       ugi = protocolUser;
@@ -553,25 +463,35 @@ abstract class ServerRpcConnection implements Closeable {
   /**
    * Send the response for connection header
    */
-  private void 
responseConnectionHeader(RPCProtos.ConnectionHeaderResponse.Builder chrBuilder)
-    throws FatalConnectionException {
+  private void sendConnectionHeaderResponseIfNeeded() throws 
FatalConnectionException {
+    Pair<RPCProtos.ConnectionHeaderResponse, CryptoAES> pair = 
setupCryptoCipher();
     // Response the connection header if Crypto AES is enabled
-    if (!chrBuilder.hasCryptoCipherMeta()) return;
+    if (pair == null) {
+      return;
+    }
     try {
-      byte[] connectionHeaderResBytes = chrBuilder.build().toByteArray();
-      // encrypt the Crypto AES cipher meta data with sasl server, and send to 
client
-      byte[] unwrapped = new byte[connectionHeaderResBytes.length + 4];
-      Bytes.putBytes(unwrapped, 0, 
Bytes.toBytes(connectionHeaderResBytes.length), 0, 4);
-      Bytes.putBytes(unwrapped, 4, connectionHeaderResBytes, 0, 
connectionHeaderResBytes.length);
-      byte[] wrapped = saslServer.wrap(unwrapped, 0, unwrapped.length);
+      int size = pair.getFirst().getSerializedSize();
       BufferChain bc;
-      try (ByteBufferOutputStream response = new 
ByteBufferOutputStream(wrapped.length + 4);
-        DataOutputStream out = new DataOutputStream(response)) {
-        out.writeInt(wrapped.length);
-        out.write(wrapped);
-        bc = new BufferChain(response.getByteBuffer());
+      try (ByteBufferOutputStream bbOut = new ByteBufferOutputStream(4 + size);
+        DataOutputStream out = new DataOutputStream(bbOut)) {
+        out.writeInt(size);
+        pair.getFirst().writeTo(out);
+        bc = new BufferChain(bbOut.getByteBuffer());
       }
-      doRespond(() -> bc);
+      doRespond(new RpcResponse() {
+
+        @Override
+        public BufferChain getResponse() {
+          return bc;
+        }
+
+        @Override
+        public void done() {
+          // must switch after sending the connection header response, as the 
client still uses the
+          // original SaslClient to unwrap the data we send back
+          saslServer.switchToCryptoAES(pair.getSecond());
+        }
+      });
     } catch (IOException ex) {
       throw new UnsupportedCryptoException(ex.getMessage(), ex);
     }
@@ -581,7 +501,9 @@ abstract class ServerRpcConnection implements Closeable {
 
   /**
    * n * Has the request header and the request param and optionally encoded 
data buffer all in this
-   * one array. nn
+   * one array.
+   * <p/>
+   * Will be overridden in tests.
    */
   protected void processRequest(ByteBuff buf) throws IOException, 
InterruptedException {
     long totalRequestSize = buf.limit();
@@ -589,14 +511,7 @@ abstract class ServerRpcConnection implements Closeable {
     // Here we read in the header. We avoid having pb
     // do its default 4k allocation for CodedInputStream. We force it to use
     // backing array.
-    CodedInputStream cis;
-    if (buf.hasArray()) {
-      cis = UnsafeByteOperations.unsafeWrap(buf.array(), 0, 
buf.limit()).newCodedInput();
-    } else {
-      cis = UnsafeByteOperations
-        .unsafeWrap(new ByteBuffByteInput(buf, 0, buf.limit()), 0, 
buf.limit()).newCodedInput();
-    }
-    cis.enableAliasing(true);
+    CodedInputStream cis = createCis(buf);
     int headerSize = cis.readRawVarint32();
     offset = cis.getTotalBytesRead();
     Message.Builder builder = RequestHeader.newBuilder();
@@ -737,7 +652,7 @@ abstract class ServerRpcConnection implements Closeable {
   }
 
   private void doBadPreambleHandling(String msg, Exception e) throws 
IOException {
-    SimpleRpcServer.LOG.warn(msg);
+    RpcServer.LOG.warn(msg);
     doRespond(getErrorResponse(msg, e));
   }
 
@@ -762,7 +677,7 @@ abstract class ServerRpcConnection implements Closeable {
     int version = preambleBuffer.get() & 0xFF;
     byte authbyte = preambleBuffer.get();
 
-    if (version != SimpleRpcServer.CURRENT_VERSION) {
+    if (version != RpcServer.CURRENT_VERSION) {
       String msg = getFatalConnectionString(version, authbyte);
       doBadPreambleHandling(msg, new WrongVersionException(msg));
       return false;
@@ -810,34 +725,28 @@ abstract class ServerRpcConnection implements Closeable {
   private static class ByteBuffByteInput extends ByteInput {
 
     private ByteBuff buf;
-    private int offset;
     private int length;
 
-    ByteBuffByteInput(ByteBuff buf, int offset, int length) {
+    ByteBuffByteInput(ByteBuff buf, int length) {
       this.buf = buf;
-      this.offset = offset;
       this.length = length;
     }
 
     @Override
     public byte read(int offset) {
-      return this.buf.get(getAbsoluteOffset(offset));
-    }
-
-    private int getAbsoluteOffset(int offset) {
-      return this.offset + offset;
+      return this.buf.get(offset);
     }
 
     @Override
     public int read(int offset, byte[] out, int outOffset, int len) {
-      this.buf.get(getAbsoluteOffset(offset), out, outOffset, len);
+      this.buf.get(offset, out, outOffset, len);
       return len;
     }
 
     @Override
     public int read(int offset, ByteBuffer out) {
       int len = out.remaining();
-      this.buf.get(out, getAbsoluteOffset(offset), len);
+      this.buf.get(out, offset, len);
       return len;
     }
 
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServerResponder.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServerResponder.java
index b9d8d3dffc4..db1b380361d 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServerResponder.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServerResponder.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hbase.ipc;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.nio.channels.CancelledKeyException;
 import java.nio.channels.ClosedChannelException;
 import java.nio.channels.SelectionKey;
@@ -28,6 +29,8 @@ import java.util.Iterator;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.util.StringUtils;
@@ -217,6 +220,28 @@ class SimpleRpcServerResponder extends Thread {
     }
   }
 
+  private BufferChain wrapWithSasl(HBaseSaslRpcServer saslServer, BufferChain 
bc)
+    throws IOException {
+    // Looks like no way around this; saslserver wants a byte array. I have to 
make it one.
+    // THIS IS A BIG UGLY COPY.
+    byte[] responseBytes = bc.getBytes();
+    byte[] token;
+    // synchronization may be needed since there can be multiple Handler
+    // threads using saslServer or Crypto AES to wrap responses.
+    synchronized (saslServer) {
+      token = saslServer.wrap(responseBytes, 0, responseBytes.length);
+    }
+    if (SimpleRpcServer.LOG.isTraceEnabled()) {
+      SimpleRpcServer.LOG
+        .trace("Adding saslServer wrapped token of size " + token.length + " 
as call response.");
+    }
+
+    ByteBuffer[] responseBufs = new ByteBuffer[2];
+    responseBufs[0] = ByteBuffer.wrap(Bytes.toBytes(token.length));
+    responseBufs[1] = ByteBuffer.wrap(token);
+    return new BufferChain(responseBufs);
+  }
+
   /**
    * Process the response for this call. You need to have the lock on
    * {@link 
org.apache.hadoop.hbase.ipc.SimpleServerRpcConnection#responseWriteLock}
@@ -226,6 +251,9 @@ class SimpleRpcServerResponder extends Thread {
     throws IOException {
     boolean error = true;
     BufferChain buf = resp.getResponse();
+    if (conn.useWrap) {
+      buf = wrapWithSasl(conn.saslServer, buf);
+    }
     try {
       // Send as much data as we can in the non-blocking fashion
       long numBytes = this.simpleRpcServer.channelWrite(conn.channel, buf);
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerRpcConnection.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerRpcConnection.java
index 51e1bedba57..4c8925d7274 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerRpcConnection.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerRpcConnection.java
@@ -17,11 +17,13 @@
  */
 package org.apache.hadoop.hbase.ipc;
 
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.InetAddress;
 import java.net.Socket;
 import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
 import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.SocketChannel;
 import java.util.concurrent.ConcurrentLinkedDeque;
@@ -34,7 +36,11 @@ import org.apache.hadoop.hbase.client.VersionInfoUtil;
 import org.apache.hadoop.hbase.exceptions.RequestTooBigException;
 import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
 import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.nio.SingleByteBuff;
+import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
+import org.apache.hadoop.hbase.security.SaslStatus;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.io.BytesWritable;
 import org.apache.yetus.audience.InterfaceAudience;
 
 import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
@@ -63,6 +69,11 @@ class SimpleServerRpcConnection extends ServerRpcConnection {
 
   // If initial preamble with version and magic has been read or not.
   private boolean connectionPreambleRead = false;
+  private boolean saslContextEstablished;
+  private ByteBuffer unwrappedData;
+  // When is this set? FindBugs wants to know! Says NP
+  private ByteBuffer unwrappedDataLengthBuffer = ByteBuffer.allocate(4);
+  boolean useWrap = false;
 
   final ConcurrentLinkedDeque<RpcResponse> responseQueue = new 
ConcurrentLinkedDeque<>();
   final Lock responseWriteLock = new ReentrantLock();
@@ -142,6 +153,110 @@ class SimpleServerRpcConnection extends 
ServerRpcConnection {
     }
   }
 
+  private void processUnwrappedData(byte[] inBuf) throws IOException, 
InterruptedException {
+    ReadableByteChannel ch = Channels.newChannel(new 
ByteArrayInputStream(inBuf));
+    // Read all RPCs contained in the inBuf, even partial ones
+    while (true) {
+      int count;
+      if (unwrappedDataLengthBuffer.remaining() > 0) {
+        count = this.rpcServer.channelRead(ch, unwrappedDataLengthBuffer);
+        if (count <= 0 || unwrappedDataLengthBuffer.remaining() > 0) {
+          return;
+        }
+      }
+
+      if (unwrappedData == null) {
+        unwrappedDataLengthBuffer.flip();
+        int unwrappedDataLength = unwrappedDataLengthBuffer.getInt();
+
+        if (unwrappedDataLength == RpcClient.PING_CALL_ID) {
+          if (RpcServer.LOG.isDebugEnabled()) RpcServer.LOG.debug("Received 
ping message");
+          unwrappedDataLengthBuffer.clear();
+          continue; // ping message
+        }
+        unwrappedData = ByteBuffer.allocate(unwrappedDataLength);
+      }
+
+      count = this.rpcServer.channelRead(ch, unwrappedData);
+      if (count <= 0 || unwrappedData.remaining() > 0) {
+        return;
+      }
+
+      if (unwrappedData.remaining() == 0) {
+        unwrappedDataLengthBuffer.clear();
+        unwrappedData.flip();
+        processOneRpc(new SingleByteBuff(unwrappedData));
+        unwrappedData = null;
+      }
+    }
+  }
+
+  private void saslReadAndProcess(ByteBuff saslToken) throws IOException, 
InterruptedException {
+    if (saslContextEstablished) {
+      RpcServer.LOG.trace("Read input token of size={} for processing by 
saslServer.unwrap()",
+        saslToken.limit());
+      if (!useWrap) {
+        processOneRpc(saslToken);
+      } else {
+        byte[] b = saslToken.hasArray() ? saslToken.array() : 
saslToken.toBytes();
+        byte[] plaintextData = saslServer.unwrap(b, 0, b.length);
+        // release the request buffer as we have already unwrapped all its 
content
+        callCleanupIfNeeded();
+        processUnwrappedData(plaintextData);
+      }
+    } else {
+      byte[] replyToken;
+      try {
+        try {
+          getOrCreateSaslServer();
+        } catch (Exception e) {
+          RpcServer.LOG.error("Error when trying to create instance of 
HBaseSaslRpcServer "
+            + "with sasl provider: " + provider, e);
+          throw e;
+        }
+        RpcServer.LOG.debug("Created SASL server with mechanism={}",
+          provider.getSaslAuthMethod().getAuthMethod());
+        RpcServer.LOG.debug(
+          "Read input token of size={} for processing by saslServer." + 
"evaluateResponse()",
+          saslToken.limit());
+        replyToken = saslServer
+          .evaluateResponse(saslToken.hasArray() ? saslToken.array() : 
saslToken.toBytes());
+      } catch (IOException e) {
+        RpcServer.LOG.debug("Failed to execute SASL handshake", e);
+        Throwable sendToClient = HBaseSaslRpcServer.unwrap(e);
+        doRawSaslReply(SaslStatus.ERROR, null, 
sendToClient.getClass().getName(),
+          sendToClient.getLocalizedMessage());
+        this.rpcServer.metrics.authenticationFailure();
+        String clientIP = this.toString();
+        // attempting user could be null
+        RpcServer.AUDITLOG.warn("{}{}: {}", RpcServer.AUTH_FAILED_FOR, 
clientIP,
+          saslServer.getAttemptingUser());
+        throw e;
+      } finally {
+        // release the request buffer as we have already unwrapped all its 
content
+        callCleanupIfNeeded();
+      }
+      if (replyToken != null) {
+        if (RpcServer.LOG.isDebugEnabled()) {
+          RpcServer.LOG.debug("Will send token of size " + replyToken.length + 
" from saslServer.");
+        }
+        doRawSaslReply(SaslStatus.SUCCESS, new BytesWritable(replyToken), 
null, null);
+      }
+      if (saslServer.isComplete()) {
+        String qop = saslServer.getNegotiatedQop();
+        useWrap = qop != null && !"auth".equalsIgnoreCase(qop);
+        ugi =
+          provider.getAuthorizedUgi(saslServer.getAuthorizationID(), 
this.rpcServer.secretManager);
+        RpcServer.LOG.debug(
+          "SASL server context established. Authenticated client: {}. 
Negotiated QoP is {}", ugi,
+          qop);
+        this.rpcServer.metrics.authenticationSuccess();
+        RpcServer.AUDITLOG.info(RpcServer.AUTH_SUCCESSFUL_FOR + ugi);
+        saslContextEstablished = true;
+      }
+    }
+  }
+
   /**
    * Read off the wire. If there is not enough data to read, update the 
connection state with what
    * we have and returns.
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcServer.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcServer.java
index eb9913174e2..6d375e0014a 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcServer.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcServer.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import javax.security.sasl.Sasl;
 import javax.security.sasl.SaslException;
 import javax.security.sasl.SaslServer;
+import org.apache.hadoop.hbase.io.crypto.aes.CryptoAES;
 import 
org.apache.hadoop.hbase.security.provider.AttemptingUserProvidingSaslServer;
 import 
org.apache.hadoop.hbase.security.provider.SaslServerAuthenticationProvider;
 import org.apache.hadoop.security.token.SecretManager;
@@ -40,6 +41,7 @@ public class HBaseSaslRpcServer {
 
   private final AttemptingUserProvidingSaslServer serverWithProvider;
   private final SaslServer saslServer;
+  private CryptoAES cryptoAES;
 
   public HBaseSaslRpcServer(SaslServerAuthenticationProvider provider,
     Map<String, String> saslProps, SecretManager<TokenIdentifier> 
secretManager)
@@ -61,16 +63,28 @@ public class HBaseSaslRpcServer {
     SaslUtil.safeDispose(saslServer);
   }
 
+  public void switchToCryptoAES(CryptoAES cryptoAES) {
+    this.cryptoAES = cryptoAES;
+  }
+
   public String getAttemptingUser() {
     return 
serverWithProvider.getAttemptingUser().map(Object::toString).orElse("Unknown");
   }
 
   public byte[] wrap(byte[] buf, int off, int len) throws SaslException {
-    return saslServer.wrap(buf, off, len);
+    if (cryptoAES != null) {
+      return cryptoAES.wrap(buf, off, len);
+    } else {
+      return saslServer.wrap(buf, off, len);
+    }
   }
 
   public byte[] unwrap(byte[] buf, int off, int len) throws SaslException {
-    return saslServer.unwrap(buf, off, len);
+    if (cryptoAES != null) {
+      return cryptoAES.unwrap(buf, off, len);
+    } else {
+      return saslServer.unwrap(buf, off, len);
+    }
   }
 
   public String getNegotiatedQop() {
@@ -92,4 +106,18 @@ public class HBaseSaslRpcServer {
     }
     return tokenIdentifier;
   }
+
+  /**
+   * Unwrap InvalidToken exception, otherwise return the one passed in.
+   */
+  public static Throwable unwrap(Throwable e) {
+    Throwable cause = e;
+    while (cause != null) {
+      if (cause instanceof InvalidToken) {
+        return cause;
+      }
+      cause = cause.getCause();
+    }
+    return e;
+  }
 }

Reply via email to