[
https://issues.apache.org/jira/browse/HBASE-21973?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Nicholas Jiang updated HBASE-21973:
-----------------------------------
Summary: NettyRpcServer performance improvement based on Netty (was:
NettyRpcServer performance improve based on Netty)
> NettyRpcServer performance improvement based on Netty
> -----------------------------------------------------
>
> Key: HBASE-21973
> URL: https://issues.apache.org/jira/browse/HBASE-21973
> Project: HBase
> Issue Type: Improvement
> Reporter: Nicholas Jiang
> Priority: Major
>
> In NettyRpcServer#NettyRpcServer constructor method, we have the following:
> {code:java}
> ServerBootstrap bootstrap = new
> ServerBootstrap().group(eventLoopGroup).channel(channelClass)
> .childOption(ChannelOption.TCP_NODELAY, tcpNoDelay)
> .childOption(ChannelOption.SO_KEEPALIVE, tcpKeepAlive)
> .childHandler(new ChannelInitializer<Channel>() {
> @Override
> protected void initChannel(Channel ch) throws Exception {
> ChannelPipeline pipeline = ch.pipeline();
> FixedLengthFrameDecoder preambleDecoder = new
> FixedLengthFrameDecoder(6);
> preambleDecoder.setSingleDecode(true);
> pipeline.addLast("preambleDecoder", preambleDecoder);
> pipeline.addLast("preambleHandler",
> createNettyRpcServerPreambleHandler());
> pipeline.addLast("frameDecoder", new
> NettyRpcFrameDecoder(maxRequestSize));
> pipeline.addLast("decoder", new
> NettyRpcServerRequestDecoder(allChannels, metrics));
> pipeline.addLast("encoder", new
> NettyRpcServerResponseEncoder(metrics));
> }
> });
> try {
> serverChannel = bootstrap.bind(this.bindAddress).sync().channel();
> LOG.info("Bind to {}", serverChannel.localAddress());
> } catch (InterruptedException e) {
> throw new InterruptedIOException(e.getMessage());
> }
> {code}
> In build ServerBootstrap, we would configure ServerSocketChannel options and
> SocketChannel child options to improve rpc perfermance.These options and
> child options are as follows:
> {code:java}
> .option(ChannelOption.SO_BACKLOG, transportConfig.getBacklog())
> .option(ChannelOption.SO_REUSEADDR,
> transportConfig.isReuseAddr())
> .option(ChannelOption.RCVBUF_ALLOCATOR,
> NettyHelper.getRecvByteBufAllocator())
> .option(ChannelOption.ALLOCATOR,
> NettyHelper.getByteBufAllocator())
> .childOption(ChannelOption.SO_KEEPALIVE,
> transportConfig.isKeepAlive())
> .childOption(ChannelOption.TCP_NODELAY,
> transportConfig.isTcpNoDelay())
> .childOption(ChannelOption.SO_RCVBUF, 8192 * 128)
> .childOption(ChannelOption.SO_SNDBUF, 8192 * 128)
> .childOption(ChannelOption.ALLOCATOR,
> NettyHelper.getByteBufAllocator())
> .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new
> WriteBufferWaterMark(
> transportConfig.getBufferMin(),
> transportConfig.getBufferMax()))
> {code}
> What's more,ChannelPipeline includes NettyRpcFrameDecoder,this decorder
> extends ByteToMessageDecoder.ChannelPipeline is as follows:
> {code:java}
> .childHandler(new ChannelInitializer<Channel>() {
> @Override
> protected void initChannel(Channel ch) throws Exception {
> ChannelPipeline pipeline = ch.pipeline();
> FixedLengthFrameDecoder preambleDecoder = new
> FixedLengthFrameDecoder(6);
> preambleDecoder.setSingleDecode(true);
> pipeline.addLast("preambleDecoder", preambleDecoder);
> pipeline.addLast("preambleHandler",
> createNettyRpcServerPreambleHandler());
> pipeline.addLast("frameDecoder", new
> NettyRpcFrameDecoder(maxRequestSize));
> pipeline.addLast("decoder", new
> NettyRpcServerRequestDecoder(allChannels, metrics));
> pipeline.addLast("encoder", new
> NettyRpcServerResponseEncoder(metrics));
> }
> });
> {code}
> Netty provides a convenient decoding tool class ByteToMessageDecoder , this
> class has accumulate bulk unpacking ability, can read bytes from the socket
> as much as possible, then synchronously call the decode method, decode the
> business object, and form a List. Finally, the traversal traverses the List
> and submits it to ChannelPipeline for processing.
> Here we can make a small change, submit the submitted content from a single
> command to the entire List, which can reduce the number of pipeline execution
> and increase throughput. This mode has no advantage in low-concurrency
> scenarios, and has a significant performance boost in boost throughput in
> high-concurrency scenarios.
> Will provide an patch and some perf-comparison for this.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)