Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2704#discussion_r193462295 --- Diff: storm-server/src/main/java/org/apache/storm/pacemaker/PacemakerServer.java --- @@ -83,48 +88,51 @@ public PacemakerServer(IServerMessageHandler handler, Map<String, Object> config ThreadFactory bossFactory = new NettyRenameThreadFactory("server-boss"); ThreadFactory workerFactory = new NettyRenameThreadFactory("server-worker"); - NioServerSocketChannelFactory factory; - if (maxWorkers > 0) { - factory = - new NioServerSocketChannelFactory(Executors.newCachedThreadPool(bossFactory), - Executors.newCachedThreadPool(workerFactory), - maxWorkers); - } else { - factory = - new NioServerSocketChannelFactory(Executors.newCachedThreadPool(bossFactory), - Executors.newCachedThreadPool(workerFactory)); - } + this.bossEventLoopGroup = new NioEventLoopGroup(1, bossFactory); + // 0 means DEFAULT_EVENT_LOOP_THREADS + // https://github.com/netty/netty/blob/netty-4.1.24.Final/transport/src/main/java/io/netty/channel/MultithreadEventLoopGroup.java#L40 + this.workerEventLoopGroup = new NioEventLoopGroup(maxWorkers > 0 ? maxWorkers : 0, workerFactory); + + LOG.info("Create Netty Server " + name() + ", buffer_size: " + FIVE_MB_IN_BYTES + ", maxWorkers: " + maxWorkers); - bootstrap = new ServerBootstrap(factory); - bootstrap.setOption("tcpNoDelay", true); - bootstrap.setOption("sendBufferSize", FIVE_MB_IN_BYTES); - bootstrap.setOption("keepAlive", true); int thriftMessageMaxSize = (Integer) config.get(Config.PACEMAKER_THRIFT_MESSAGE_SIZE_MAX); - ChannelPipelineFactory pipelineFactory = - new ThriftNettyServerCodec(this, config, authMethod, thriftMessageMaxSize) - .pipelineFactory(); - bootstrap.setPipelineFactory(pipelineFactory); - Channel channel = bootstrap.bind(new InetSocketAddress(port)); - allChannels.add(channel); + ServerBootstrap bootstrap = new ServerBootstrap() + .group(bossEventLoopGroup, workerEventLoopGroup) + .channel(NioServerSocketChannel.class) + .childOption(ChannelOption.TCP_NODELAY, true) + .childOption(ChannelOption.SO_SNDBUF, FIVE_MB_IN_BYTES) + .childOption(ChannelOption.SO_KEEPALIVE, true) + .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(8 * 1024, 32 * 1024)) + .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) + .childHandler(new ThriftNettyServerCodec(this, config, authMethod, thriftMessageMaxSize)); + + try { + ChannelFuture channelFuture = bootstrap.bind(new InetSocketAddress(port)).sync(); + allChannels.add(channelFuture.channel()); + } catch (InterruptedException e) { + throw new RuntimeException(e); --- End diff -- I'm not sure what to replace this with. Do you have something in mind?
---