Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2704#discussion_r193452701 --- Diff: storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java --- @@ -135,19 +141,29 @@ saslChannelReady.set(!ObjectReader.getBoolean(topoConf.get(Config.STORM_MESSAGING_NETTY_AUTHENTICATION), false)); LOG.info("Creating Netty Client, connecting to {}:{}, bufferSize: {}, lowWatermark: {}, highWatermark: {}", host, port, bufferSize, lowWatermark, highWatermark); - int messageBatchSize = ObjectReader.getInt(topoConf.get(Config.STORM_NETTY_MESSAGE_BATCH_SIZE), 262144); int maxReconnectionAttempts = ObjectReader.getInt(topoConf.get(Config.STORM_MESSAGING_NETTY_MAX_RETRIES)); int minWaitMs = ObjectReader.getInt(topoConf.get(Config.STORM_MESSAGING_NETTY_MIN_SLEEP_MS)); int maxWaitMs = ObjectReader.getInt(topoConf.get(Config.STORM_MESSAGING_NETTY_MAX_SLEEP_MS)); retryPolicy = new StormBoundedExponentialBackoffRetry(minWaitMs, maxWaitMs, maxReconnectionAttempts); // Initiate connection to remote destination - bootstrap = createClientBootstrap(factory, bufferSize, lowWatermark, highWatermark, topoConf, remoteBpStatus); + this.eventLoopGroup = eventLoopGroup; + // Initiate connection to remote destination + bootstrap = new Bootstrap() + .group(this.eventLoopGroup) + .channel(NioSocketChannel.class) + .option(ChannelOption.TCP_NODELAY, true) + .option(ChannelOption.SO_SNDBUF, bufferSize) + .option(ChannelOption.SO_KEEPALIVE, true) + .option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(lowWatermark, highWatermark)) + .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) --- End diff -- If we are using the pooled allocator have we validated that we are releasing the buffers properly at the end each time?
---