Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2704#discussion_r193452701
--- Diff: storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
---
@@ -135,19 +141,29 @@
saslChannelReady.set(!ObjectReader.getBoolean(topoConf.get(Config.STORM_MESSAGING_NETTY_AUTHENTICATION),
false));
LOG.info("Creating Netty Client, connecting to {}:{}, bufferSize:
{}, lowWatermark: {}, highWatermark: {}",
host, port, bufferSize, lowWatermark, highWatermark);
- int messageBatchSize =
ObjectReader.getInt(topoConf.get(Config.STORM_NETTY_MESSAGE_BATCH_SIZE),
262144);
int maxReconnectionAttempts =
ObjectReader.getInt(topoConf.get(Config.STORM_MESSAGING_NETTY_MAX_RETRIES));
int minWaitMs =
ObjectReader.getInt(topoConf.get(Config.STORM_MESSAGING_NETTY_MIN_SLEEP_MS));
int maxWaitMs =
ObjectReader.getInt(topoConf.get(Config.STORM_MESSAGING_NETTY_MAX_SLEEP_MS));
retryPolicy = new StormBoundedExponentialBackoffRetry(minWaitMs,
maxWaitMs, maxReconnectionAttempts);
// Initiate connection to remote destination
- bootstrap = createClientBootstrap(factory, bufferSize,
lowWatermark, highWatermark, topoConf, remoteBpStatus);
+ this.eventLoopGroup = eventLoopGroup;
+ // Initiate connection to remote destination
+ bootstrap = new Bootstrap()
+ .group(this.eventLoopGroup)
+ .channel(NioSocketChannel.class)
+ .option(ChannelOption.TCP_NODELAY, true)
+ .option(ChannelOption.SO_SNDBUF, bufferSize)
+ .option(ChannelOption.SO_KEEPALIVE, true)
+ .option(ChannelOption.WRITE_BUFFER_WATER_MARK, new
WriteBufferWaterMark(lowWatermark, highWatermark))
+ .option(ChannelOption.ALLOCATOR,
PooledByteBufAllocator.DEFAULT)
--- End diff --
If we are using the pooled allocator have we validated that we are
releasing the buffers properly at the end each time?
---