matt fisher created FLUME-2221:
----------------------------------
Summary: NIO Worker threads remain open when connection fails
Key: FLUME-2221
URL: https://issues.apache.org/jira/browse/FLUME-2221
Project: Flume
Issue Type: Bug
Components: Client SDK
Affects Versions: v1.4.0
Environment: Flume 1.4.0
Java 1.7.0_10
Reporter: matt fisher
When LoadBalancingLog4JAppender loses its connection to Flume, upon subsequent
attempts to log, it will spawn NIO threads and leave them open. This is coming
from the NettyAcroRPCClient's attempt to instantiate a NettyTransciever. When
the Transciever fails to connect in the constructor, an exception is thrown,
but the NIO threads that were spawned by the NioClientSocketChannelFactory in
the Boss and Worker pools are never closed. This causes more threads to be
started for each attempt at sending a log.
The code in question from NettyAvroRPCClient.java:
{code}
transceiver = new NettyTransceiver(this.address,
new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(new TransceiverThreadFactory(
"Avro " + NettyTransceiver.class.getSimpleName() + " Boss")),
Executors.newCachedThreadPool(new TransceiverThreadFactory(
"Avro " + NettyTransceiver.class.getSimpleName() + " I/O Worker"))),
tu.toMillis(timeout));
{code}
Because all of the parameters are instantiated inline, there is no reference
from which to clean them up if the Constructor throws an exception.
I am proposing we explicitly instantiate these variables so that, if the
constructor of NettyTransciever throws an exception and returns null, then we
can clean up the threads that were spawned.
something along the lines of:
{code}
callTimeoutPool = Executors.newCachedThreadPool(
new TransceiverThreadFactory("Flume Avro RPC Client Call Invoker"));
NioClientSocketChannelFactory factory;// move declaration out so that it is
accessible in catch
try {
factory = new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(new TransceiverThreadFactory(
"Avro " + NettyTransceiver.class.getSimpleName() + " Boss")),
Executors.newCachedThreadPool(new TransceiverThreadFactory(
"Avro " + NettyTransceiver.class.getSimpleName() + " I/O
Worker"))); //move instantiation out of constructor signature so taht it is
accessible afer constructor call
transceiver = new NettyTransceiver(this.address, factory,
tu.toMillis(timeout));
avroClient =
SpecificRequestor.getClient(AvroSourceProtocol.Callback.class,
transceiver);
} catch (IOException ex) {
/*Graceful shutdown (from
http://docs.jboss.org/netty/3.2/api/org/jboss/netty/channel/ChannelFactory.html)
To shut down a network application service which is managed by a
factory. you should follow the following steps:
close all channels created by the factory and their child channels
usually using ChannelGroup.close(), and
call releaseExternalResources().*/
factory.releaseExternalResources();//
throw new FlumeException(this + ": RPC connection error", ex);
}
setState(ConnState.READY);
}
{code}
--
This message was sent by Atlassian JIRA
(v6.1#6144)