aajisaka commented on a change in pull request #3966:
URL: https://github.com/apache/hadoop/pull/3966#discussion_r818807465
##########
File path:
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
##########
@@ -4025,4 +4343,348 @@ protected int getMaxIdleTime() {
public String getServerName() {
return serverName;
}
+
+ // avoid netty trying to "guess" an appropriate buffer size.
+ private static final RecvByteBufAllocator IPC_RECVBUF_ALLOCATOR =
+ new FixedRecvByteBufAllocator(NIO_BUFFER_LIMIT);
+
+ private class NettyListener implements Listener<io.netty.channel.Channel> {
+ private ServerBootstrap bootstrap;
+ private NettyThreadFactory listenerFactory;
+ private NettyThreadFactory readerFactory;
+ private ChannelGroup acceptChannels =
+ new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
+ private int backlogLength = conf.getInt(
+ CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_KEY,
+ CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_DEFAULT);
+ private EventLoopGroup acceptors;
+ private EventLoopGroup readers;
+
+ NettyListener(int port) throws IOException {
+ if (!LOG.isDebugEnabled()) {
+ ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.DISABLED);
+ }
+ Class<? extends io.netty.channel.socket.ServerSocketChannel>
channelClass;
+ listenerFactory = new NettyThreadFactory("Netty Socket Acceptor", port);
+ readerFactory = new NettyThreadFactory("Netty Socket Reader", port);
+
+ // netty's readers double as responders so double the readers to
+ // compensate.
+ int numReaders = 2 * getNumReaders();
+ // Attempt to use native transport if available.
+ if (Epoll.isAvailable()) { // Linux.
+ channelClass = EpollServerSocketChannel.class;
+ acceptors = new EpollEventLoopGroup(1, listenerFactory);
+ readers = new EpollEventLoopGroup(numReaders, readerFactory);
+ } else if (KQueue.isAvailable()) { // OS X/BSD.
+ channelClass = KQueueServerSocketChannel.class;
+ acceptors = new KQueueEventLoopGroup(1, listenerFactory);
+ readers = new KQueueEventLoopGroup(numReaders, readerFactory);
+ } else {
+ channelClass = NioServerSocketChannel.class;
+ acceptors = new NioEventLoopGroup(1, listenerFactory);
+ readers = new NioEventLoopGroup(numReaders, readerFactory);
+ }
+ bootstrap = new ServerBootstrap()
+ .group(acceptors, readers)
+ .channel(channelClass)
+ .option(ChannelOption.SO_BACKLOG, backlogLength)
+ .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
+ .childOption(ChannelOption.RCVBUF_ALLOCATOR, IPC_RECVBUF_ALLOCATOR)
+ .childOption(ChannelOption.SO_KEEPALIVE, true)
+ .childOption(ChannelOption.SO_REUSEADDR, true)
+ .childOption(ChannelOption.TCP_NODELAY, tcpNoDelay)
+ .childHandler(new ChannelInitializer<Channel>() {
+ @Override
+ protected void initChannel(io.netty.channel.Channel channel)
+ throws Exception {
+ connectionManager.register(new NettyConnection(channel));
+ }});
+
+ InetSocketAddress address = new InetSocketAddress(bindAddress, port);
+ io.netty.channel.Channel channel = Server.bind(Binder.NETTY, bootstrap,
+ address, backlogLength, conf, portRangeConfig);
+ registerAcceptChannel(channel);
+ // If may have been an ephemeral port or port range bind, so update
+ // the thread factories to rename any already created threads.
+ port = ((InetSocketAddress)channel.localAddress()).getPort();
+ listenerFactory.updatePort(port);
+ readerFactory.updatePort(port);
+ }
+
+ @Override
+ public InetSocketAddress getAddress() {
+ return Server.this.getListenerAddress();
+ }
+
+ @Override
+ public void listen(InetSocketAddress addr) throws IOException {
+ registerAcceptChannel(Binder.NETTY.bind(bootstrap, addr, backlogLength));
+ }
+
+ @Override
+ public void registerAcceptChannel(io.netty.channel.Channel channel) {
+ acceptChannels.add(channel);
+ addListenerAddress((InetSocketAddress)channel.localAddress());
+ }
+
+ @Override
+ public void closeAcceptChannels() {
+ acceptChannels.close();
+ }
+
+ @Override
+ public void start() {
+ connectionManager.startIdleScan();
+ }
+
+ @Override
+ public void interrupt() {
+ doStop();
+ }
+
+ @Override
+ public void doStop() {
+ try {
+ //TODO : Add Boolean stopped to avoid double stoppage.
+
+ // closing will send events to the bootstrap's event loop groups.
+ closeAcceptChannels();
+ connectionManager.stopIdleScan();
+ connectionManager.closeAll();
+ // shutdown the event loops to reject all further events.
+ ServerBootstrapConfig config = bootstrap.config();
+ //config.group().shutdownGracefully().awaitUninterruptibly();
+ //config.childGroup().shutdownGracefully().awaitUninterruptibly();
+ acceptors.shutdownGracefully().awaitUninterruptibly();
+ readers.shutdownGracefully().awaitUninterruptibly();
+ listenerFactory.close();
+ readerFactory.close();
+ } catch (IOException ioe) {
+ LOG.warn("Unable to shutdown Netty listener and reader threads : ",
ioe);
+ }
+ finally {
+ IOUtils.cleanupWithLogger(LOG, listenerFactory, readerFactory);
+ }
+ }
+ }
+
+ @ChannelHandler.Sharable
+ private class NettyResponder extends ChannelOutboundHandlerAdapter
+ implements Responder {
+ @Override
+ public void start() {}
+ @Override
+ public void interrupt() {}
+ // called by handlers.
+ // TODO: Is queuing required similar to the NioResponder implementation ?
+ @Override
+ public void doRespond(RpcCall call) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(Thread.currentThread().getName() +
+ ": responding to " + call);
+ }
+ NettyConnection connection = call.connection();
+ io.netty.channel.Channel channel = connection.channel;
+ channel.writeAndFlush(call, channel.voidPromise());
+ }
+ // called by the netty context. do not call externally.
+ @Override
+ public void write(ChannelHandlerContext ctx, Object msg,
+ ChannelPromise promise) {
+ if (msg instanceof RpcCall) {
+ RpcCall call = (RpcCall)msg;
+ try {
+ if (call.connection.useWrap) {
+ wrapWithSasl(call);
+ }
+ byte[] response = call.rpcResponse.array();
+ msg = Unpooled.wrappedBuffer(response);
+ rpcMetrics.incrSentBytes(response.length);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(Thread.currentThread().getName() +
+ ": responding to " + call +
+ " Wrote " + response.length + " bytes.");
+ }
+ } catch (Throwable e) {
+ LOG.warn(Thread.currentThread().getName() +
+ ", call " + call + ": output error");
+ ctx.close();
+ return;
+ } finally {
+ call.connection.decRpcCount();
+ }
+ }
+ ctx.write(msg, promise);
+ }
+ }
+
+ private class NettyConnection extends Connection<io.netty.channel.Channel> {
+ NettyConnection(io.netty.channel.Channel channel)
+ throws Exception {
+ super(channel, (InetSocketAddress) channel.localAddress(),
+ (InetSocketAddress) channel.remoteAddress());
+ ChannelInboundHandler decoder = new ByteToMessageDecoder() {
+ @Override
+ public void decode(ChannelHandlerContext ctx, ByteBuf in,
+ List<Object> out) throws Exception {
+ doRead(in);
+ }
+
+ // client closed the connection.
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) {
+ connectionManager.close(NettyConnection.this);
+ }
+ };
+
+ //TODO: Make the self signed certificate creation optional for testing
+ // purposes only.
+
+ SslContext sslCtx = null;
+
+ SslHandler sslHandler = null;
+
+ if (enableNettyTesting()) {
+ SelfSignedCertificate ssc = null;
+
+ try {
+ ssc = new SelfSignedCertificate();
+ } catch (CertificateException e) {
+ throw new IOException(
+ "Exception while creating a SelfSignedCertificate object.", e);
+ }
+
+ try {
+ sslCtx =
+ SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey())
+ .build();
+ } catch (SSLException e) {
+ throw new IOException("Exception while building a SSLContext", e);
+ }
+
+ sslHandler = sslCtx.newHandler(channel.alloc());
+ } else {
+ //TODO: Find where we will get the location to ssl-server.xml and
+ // ssl-client.xml. For now we proceed with the assumption that
+ // these configuration files are available
+ String sslServerConfFile = "ssl-server.xml";
+
+ SSLHandlerProvider sslServerHandlerProvider =
+ new SSLHandlerProvider(sslServerConfFile, "TLS", "SHA1withRSA",
+ false);
+
+ sslHandler = sslServerHandlerProvider.getSSLHandler(channel.alloc());
+ }
+
+ if (sslHandler != null) {
+ sslHandler.handshakeFuture()
+ .addListener(new GenericFutureListener<Future<Channel>>() {
+ @Override
+ public void operationComplete(
+ final io.netty.util.concurrent.Future<Channel>
handshakeFuture)
+ throws Exception {
+ if (handshakeFuture.isSuccess()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("TLS handshake success");
+ }
+ } else {
+ throw new IOException(
+ "TLS handshake failed." + handshakeFuture.cause());
+ }
+ }
+ });
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Adding the SSLHandler to the pipeline");
+ }
Review comment:
The if sentence can be removed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]