This is an automated email from the ASF dual-hosted git repository. zhangduo pushed a commit to branch branch-2 in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 272805c32e9f2200716aa60310aaf72670b5baf9 Author: Duo Zhang <[email protected]> AuthorDate: Thu Aug 4 22:31:58 2022 +0800 HBASE-27271 BufferCallBeforeInitHandler should ignore the flush request (#4676) Signed-off-by: Balazs Meszaros <[email protected]> (cherry picked from commit fb529e23526eaa250bdd355db3c2f0dedc55c3e8) Conflicts: hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java --- .../hbase/ipc/BufferCallBeforeInitHandler.java | 7 ++ .../hadoop/hbase/ipc/NettyRpcConnection.java | 78 ++++++++++++---------- 2 files changed, 50 insertions(+), 35 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BufferCallBeforeInitHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BufferCallBeforeInitHandler.java index 53ac3d86fe5..5f54ef1c603 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BufferCallBeforeInitHandler.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BufferCallBeforeInitHandler.java @@ -34,6 +34,8 @@ import org.apache.hbase.thirdparty.io.netty.channel.ChannelPromise; @InterfaceAudience.Private class BufferCallBeforeInitHandler extends ChannelDuplexHandler { + static final String NAME = "BufferCall"; + private enum BufferCallAction { FLUSH, FAIL @@ -78,6 +80,11 @@ class BufferCallBeforeInitHandler extends ChannelDuplexHandler { } } + @Override + public void flush(ChannelHandlerContext ctx) throws Exception { + // do not flush anything out + } + @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof BufferCallEvent) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java index c4e474e0694..2d51d1826c8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java @@ -51,7 +51,7 @@ import org.apache.hbase.thirdparty.io.netty.buffer.Unpooled; import org.apache.hbase.thirdparty.io.netty.channel.Channel; import org.apache.hbase.thirdparty.io.netty.channel.ChannelFuture; import org.apache.hbase.thirdparty.io.netty.channel.ChannelFutureListener; -import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler; +import org.apache.hbase.thirdparty.io.netty.channel.ChannelInitializer; import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption; import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline; import org.apache.hbase.thirdparty.io.netty.channel.EventLoop; @@ -156,14 +156,14 @@ class NettyRpcConnection extends RpcConnection { private void established(Channel ch) throws IOException { assert eventLoop.inEventLoop(); - ChannelPipeline p = ch.pipeline(); - String addBeforeHandler = p.context(BufferCallBeforeInitHandler.class).name(); - p.addBefore(addBeforeHandler, null, - new IdleStateHandler(0, rpcClient.minIdleTimeBeforeClose, 0, TimeUnit.MILLISECONDS)); - p.addBefore(addBeforeHandler, null, new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4)); - p.addBefore(addBeforeHandler, null, - new NettyRpcDuplexHandler(this, rpcClient.cellBlockBuilder, codec, compressor)); - p.fireUserEventTriggered(BufferCallEvent.success()); + ch.pipeline() + .addBefore(BufferCallBeforeInitHandler.NAME, null, + new IdleStateHandler(0, rpcClient.minIdleTimeBeforeClose, 0, TimeUnit.MILLISECONDS)) + .addBefore(BufferCallBeforeInitHandler.NAME, null, + new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4)) + .addBefore(BufferCallBeforeInitHandler.NAME, null, + new NettyRpcDuplexHandler(this, rpcClient.cellBlockBuilder, codec, compressor)) + .fireUserEventTriggered(BufferCallEvent.success()); } private boolean reloginInProgress; @@ -218,8 +218,8 @@ class NettyRpcConnection extends RpcConnection { failInit(ch, e); return; } - ch.pipeline().addFirst("SaslDecoder", new SaslChallengeDecoder()).addAfter("SaslDecoder", - "SaslHandler", saslHandler); + ch.pipeline().addBefore(BufferCallBeforeInitHandler.NAME, null, new SaslChallengeDecoder()) + .addBefore(BufferCallBeforeInitHandler.NAME, null, saslHandler); NettyFutureUtils.addListener(saslPromise, new FutureListener<Boolean>() { @Override @@ -230,32 +230,33 @@ class NettyRpcConnection extends RpcConnection { if (saslHandler.isNeedProcessConnectionHeader()) { Promise<Boolean> connectionHeaderPromise = ch.eventLoop().newPromise(); // create the handler to handle the connection header - ChannelHandler chHandler = new NettyHBaseRpcConnectionHeaderHandler( - connectionHeaderPromise, conf, connectionHeaderWithLength); + NettyHBaseRpcConnectionHeaderHandler chHandler = + new NettyHBaseRpcConnectionHeaderHandler(connectionHeaderPromise, conf, + connectionHeaderWithLength); // add ReadTimeoutHandler to deal with server doesn't response connection header // because of the different configuration in client side and server side - p.addFirst( - new ReadTimeoutHandler(RpcClient.DEFAULT_SOCKET_TIMEOUT_READ, TimeUnit.MILLISECONDS)); - p.addLast(chHandler); - NettyFutureUtils - .consume(connectionHeaderPromise.addListener(new FutureListener<Boolean>() { - @Override - public void operationComplete(Future<Boolean> future) throws Exception { - if (future.isSuccess()) { - ChannelPipeline p = ch.pipeline(); - p.remove(ReadTimeoutHandler.class); - p.remove(NettyHBaseRpcConnectionHeaderHandler.class); - // don't send connection header, NettyHbaseRpcConnectionHeaderHandler - // sent it already - established(ch); - } else { - final Throwable error = future.cause(); - scheduleRelogin(error); - failInit(ch, toIOE(error)); - } + final String readTimeoutHandlerName = "ReadTimeout"; + p.addBefore(BufferCallBeforeInitHandler.NAME, readTimeoutHandlerName, + new ReadTimeoutHandler(RpcClient.DEFAULT_SOCKET_TIMEOUT_READ, TimeUnit.MILLISECONDS)) + .addBefore(BufferCallBeforeInitHandler.NAME, null, chHandler); + NettyFutureUtils.addListener(connectionHeaderPromise, new FutureListener<Boolean>() { + @Override + public void operationComplete(Future<Boolean> future) throws Exception { + if (future.isSuccess()) { + ChannelPipeline p = ch.pipeline(); + p.remove(readTimeoutHandlerName); + p.remove(NettyHBaseRpcConnectionHeaderHandler.class); + // don't send connection header, NettyHbaseRpcConnectionHeaderHandler + // sent it already + established(ch); + } else { + final Throwable error = future.cause(); + scheduleRelogin(error); + failInit(ch, toIOE(error)); } - })); + } + }); } else { // send the connection header to server NettyFutureUtils.safeWrite(ch, connectionHeaderWithLength.retainedDuplicate()); @@ -278,8 +279,15 @@ class NettyRpcConnection extends RpcConnection { .option(ChannelOption.TCP_NODELAY, rpcClient.isTcpNoDelay()) .option(ChannelOption.SO_KEEPALIVE, rpcClient.tcpKeepAlive) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, rpcClient.connectTO) - .handler(new BufferCallBeforeInitHandler()).localAddress(rpcClient.localAddr) - .remoteAddress(remoteAddr).connect().addListener(new ChannelFutureListener() { + .handler(new ChannelInitializer<Channel>() { + + @Override + protected void initChannel(Channel ch) throws Exception { + ch.pipeline().addLast(BufferCallBeforeInitHandler.NAME, + new BufferCallBeforeInitHandler()); + } + }).localAddress(rpcClient.localAddr).remoteAddress(remoteAddr).connect() + .addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception {
