This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 272805c32e9f2200716aa60310aaf72670b5baf9
Author: Duo Zhang <[email protected]>
AuthorDate: Thu Aug 4 22:31:58 2022 +0800

    HBASE-27271 BufferCallBeforeInitHandler should ignore the flush request 
(#4676)
    
    Signed-off-by: Balazs Meszaros <[email protected]>
    (cherry picked from commit fb529e23526eaa250bdd355db3c2f0dedc55c3e8)
    
    Conflicts:
            
hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
---
 .../hbase/ipc/BufferCallBeforeInitHandler.java     |  7 ++
 .../hadoop/hbase/ipc/NettyRpcConnection.java       | 78 ++++++++++++----------
 2 files changed, 50 insertions(+), 35 deletions(-)

diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BufferCallBeforeInitHandler.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BufferCallBeforeInitHandler.java
index 53ac3d86fe5..5f54ef1c603 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BufferCallBeforeInitHandler.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BufferCallBeforeInitHandler.java
@@ -34,6 +34,8 @@ import 
org.apache.hbase.thirdparty.io.netty.channel.ChannelPromise;
 @InterfaceAudience.Private
 class BufferCallBeforeInitHandler extends ChannelDuplexHandler {
 
+  static final String NAME = "BufferCall";
+
   private enum BufferCallAction {
     FLUSH,
     FAIL
@@ -78,6 +80,11 @@ class BufferCallBeforeInitHandler extends 
ChannelDuplexHandler {
     }
   }
 
+  @Override
+  public void flush(ChannelHandlerContext ctx) throws Exception {
+    // do not flush anything out
+  }
+
   @Override
   public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws 
Exception {
     if (evt instanceof BufferCallEvent) {
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
index c4e474e0694..2d51d1826c8 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
@@ -51,7 +51,7 @@ import org.apache.hbase.thirdparty.io.netty.buffer.Unpooled;
 import org.apache.hbase.thirdparty.io.netty.channel.Channel;
 import org.apache.hbase.thirdparty.io.netty.channel.ChannelFuture;
 import org.apache.hbase.thirdparty.io.netty.channel.ChannelFutureListener;
-import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelInitializer;
 import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption;
 import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
 import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
@@ -156,14 +156,14 @@ class NettyRpcConnection extends RpcConnection {
 
   private void established(Channel ch) throws IOException {
     assert eventLoop.inEventLoop();
-    ChannelPipeline p = ch.pipeline();
-    String addBeforeHandler = 
p.context(BufferCallBeforeInitHandler.class).name();
-    p.addBefore(addBeforeHandler, null,
-      new IdleStateHandler(0, rpcClient.minIdleTimeBeforeClose, 0, 
TimeUnit.MILLISECONDS));
-    p.addBefore(addBeforeHandler, null, new 
LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4));
-    p.addBefore(addBeforeHandler, null,
-      new NettyRpcDuplexHandler(this, rpcClient.cellBlockBuilder, codec, 
compressor));
-    p.fireUserEventTriggered(BufferCallEvent.success());
+    ch.pipeline()
+      .addBefore(BufferCallBeforeInitHandler.NAME, null,
+        new IdleStateHandler(0, rpcClient.minIdleTimeBeforeClose, 0, 
TimeUnit.MILLISECONDS))
+      .addBefore(BufferCallBeforeInitHandler.NAME, null,
+        new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4))
+      .addBefore(BufferCallBeforeInitHandler.NAME, null,
+        new NettyRpcDuplexHandler(this, rpcClient.cellBlockBuilder, codec, 
compressor))
+      .fireUserEventTriggered(BufferCallEvent.success());
   }
 
   private boolean reloginInProgress;
@@ -218,8 +218,8 @@ class NettyRpcConnection extends RpcConnection {
       failInit(ch, e);
       return;
     }
-    ch.pipeline().addFirst("SaslDecoder", new 
SaslChallengeDecoder()).addAfter("SaslDecoder",
-      "SaslHandler", saslHandler);
+    ch.pipeline().addBefore(BufferCallBeforeInitHandler.NAME, null, new 
SaslChallengeDecoder())
+      .addBefore(BufferCallBeforeInitHandler.NAME, null, saslHandler);
     NettyFutureUtils.addListener(saslPromise, new FutureListener<Boolean>() {
 
       @Override
@@ -230,32 +230,33 @@ class NettyRpcConnection extends RpcConnection {
           if (saslHandler.isNeedProcessConnectionHeader()) {
             Promise<Boolean> connectionHeaderPromise = 
ch.eventLoop().newPromise();
             // create the handler to handle the connection header
-            ChannelHandler chHandler = new 
NettyHBaseRpcConnectionHeaderHandler(
-              connectionHeaderPromise, conf, connectionHeaderWithLength);
+            NettyHBaseRpcConnectionHeaderHandler chHandler =
+              new 
NettyHBaseRpcConnectionHeaderHandler(connectionHeaderPromise, conf,
+                connectionHeaderWithLength);
 
             // add ReadTimeoutHandler to deal with server doesn't response 
connection header
             // because of the different configuration in client side and 
server side
-            p.addFirst(
-              new ReadTimeoutHandler(RpcClient.DEFAULT_SOCKET_TIMEOUT_READ, 
TimeUnit.MILLISECONDS));
-            p.addLast(chHandler);
-            NettyFutureUtils
-              .consume(connectionHeaderPromise.addListener(new 
FutureListener<Boolean>() {
-                @Override
-                public void operationComplete(Future<Boolean> future) throws 
Exception {
-                  if (future.isSuccess()) {
-                    ChannelPipeline p = ch.pipeline();
-                    p.remove(ReadTimeoutHandler.class);
-                    p.remove(NettyHBaseRpcConnectionHeaderHandler.class);
-                    // don't send connection header, 
NettyHbaseRpcConnectionHeaderHandler
-                    // sent it already
-                    established(ch);
-                  } else {
-                    final Throwable error = future.cause();
-                    scheduleRelogin(error);
-                    failInit(ch, toIOE(error));
-                  }
+            final String readTimeoutHandlerName = "ReadTimeout";
+            p.addBefore(BufferCallBeforeInitHandler.NAME, 
readTimeoutHandlerName,
+              new ReadTimeoutHandler(RpcClient.DEFAULT_SOCKET_TIMEOUT_READ, 
TimeUnit.MILLISECONDS))
+              .addBefore(BufferCallBeforeInitHandler.NAME, null, chHandler);
+            NettyFutureUtils.addListener(connectionHeaderPromise, new 
FutureListener<Boolean>() {
+              @Override
+              public void operationComplete(Future<Boolean> future) throws 
Exception {
+                if (future.isSuccess()) {
+                  ChannelPipeline p = ch.pipeline();
+                  p.remove(readTimeoutHandlerName);
+                  p.remove(NettyHBaseRpcConnectionHeaderHandler.class);
+                  // don't send connection header, 
NettyHbaseRpcConnectionHeaderHandler
+                  // sent it already
+                  established(ch);
+                } else {
+                  final Throwable error = future.cause();
+                  scheduleRelogin(error);
+                  failInit(ch, toIOE(error));
                 }
-              }));
+              }
+            });
           } else {
             // send the connection header to server
             NettyFutureUtils.safeWrite(ch, 
connectionHeaderWithLength.retainedDuplicate());
@@ -278,8 +279,15 @@ class NettyRpcConnection extends RpcConnection {
       .option(ChannelOption.TCP_NODELAY, rpcClient.isTcpNoDelay())
       .option(ChannelOption.SO_KEEPALIVE, rpcClient.tcpKeepAlive)
       .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, rpcClient.connectTO)
-      .handler(new 
BufferCallBeforeInitHandler()).localAddress(rpcClient.localAddr)
-      .remoteAddress(remoteAddr).connect().addListener(new 
ChannelFutureListener() {
+      .handler(new ChannelInitializer<Channel>() {
+
+        @Override
+        protected void initChannel(Channel ch) throws Exception {
+          ch.pipeline().addLast(BufferCallBeforeInitHandler.NAME,
+            new BufferCallBeforeInitHandler());
+        }
+      }).localAddress(rpcClient.localAddr).remoteAddress(remoteAddr).connect()
+      .addListener(new ChannelFutureListener() {
 
         @Override
         public void operationComplete(ChannelFuture future) throws Exception {

Reply via email to