Github user danny0405 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2704#discussion_r193438481
--- Diff:
storm-server/src/main/java/org/apache/storm/pacemaker/PacemakerServer.java ---
@@ -83,48 +88,51 @@ public PacemakerServer(IServerMessageHandler handler,
Map<String, Object> config
ThreadFactory bossFactory = new
NettyRenameThreadFactory("server-boss");
ThreadFactory workerFactory = new
NettyRenameThreadFactory("server-worker");
- NioServerSocketChannelFactory factory;
- if (maxWorkers > 0) {
- factory =
- new
NioServerSocketChannelFactory(Executors.newCachedThreadPool(bossFactory),
-
Executors.newCachedThreadPool(workerFactory),
- maxWorkers);
- } else {
- factory =
- new
NioServerSocketChannelFactory(Executors.newCachedThreadPool(bossFactory),
-
Executors.newCachedThreadPool(workerFactory));
- }
+ this.bossEventLoopGroup = new NioEventLoopGroup(1, bossFactory);
+ // 0 means DEFAULT_EVENT_LOOP_THREADS
+ //
https://github.com/netty/netty/blob/netty-4.1.24.Final/transport/src/main/java/io/netty/channel/MultithreadEventLoopGroup.java#L40
+ this.workerEventLoopGroup = new NioEventLoopGroup(maxWorkers > 0 ?
maxWorkers : 0, workerFactory);
+
+ LOG.info("Create Netty Server " + name() + ", buffer_size: " +
FIVE_MB_IN_BYTES + ", maxWorkers: " + maxWorkers);
- bootstrap = new ServerBootstrap(factory);
- bootstrap.setOption("tcpNoDelay", true);
- bootstrap.setOption("sendBufferSize", FIVE_MB_IN_BYTES);
- bootstrap.setOption("keepAlive", true);
int thriftMessageMaxSize = (Integer)
config.get(Config.PACEMAKER_THRIFT_MESSAGE_SIZE_MAX);
- ChannelPipelineFactory pipelineFactory =
- new ThriftNettyServerCodec(this, config, authMethod,
thriftMessageMaxSize)
- .pipelineFactory();
- bootstrap.setPipelineFactory(pipelineFactory);
- Channel channel = bootstrap.bind(new InetSocketAddress(port));
- allChannels.add(channel);
+ ServerBootstrap bootstrap = new ServerBootstrap()
+ .group(bossEventLoopGroup, workerEventLoopGroup)
+ .channel(NioServerSocketChannel.class)
+ .childOption(ChannelOption.TCP_NODELAY, true)
+ .childOption(ChannelOption.SO_SNDBUF, FIVE_MB_IN_BYTES)
+ .childOption(ChannelOption.SO_KEEPALIVE, true)
+ .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new
WriteBufferWaterMark(8 * 1024, 32 * 1024))
+ .childOption(ChannelOption.ALLOCATOR,
PooledByteBufAllocator.DEFAULT)
+ .childHandler(new ThriftNettyServerCodec(this, config,
authMethod, thriftMessageMaxSize));
+
+ try {
+ ChannelFuture channelFuture = bootstrap.bind(new
InetSocketAddress(port)).sync();
+ allChannels.add(channelFuture.channel());
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
--- End diff --
Code reuse as in
storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java
---