[
https://issues.apache.org/jira/browse/FLUME-2221?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
matt fisher resolved FLUME-2221.
--------------------------------
Resolution: Not A Problem
My apologies, I was working with 1.3.1, and this has already been dealt with in
1.4.0. Closing
> NIO Worker threads remain open when connection fails
> ----------------------------------------------------
>
> Key: FLUME-2221
> URL: https://issues.apache.org/jira/browse/FLUME-2221
> Project: Flume
> Issue Type: Bug
> Components: Client SDK
> Affects Versions: v1.4.0
> Environment: Flume 1.4.0
> Java 1.7.0_10
> Reporter: matt fisher
>
> When LoadBalancingLog4JAppender loses its connection to Flume, upon
> subsequent attempts to log, it will spawn NIO threads and leave them open.
> This is coming from the NettyAcroRPCClient's attempt to instantiate a
> NettyTransciever. When the Transciever fails to connect in the constructor,
> an exception is thrown, but the NIO threads that were spawned by the
> NioClientSocketChannelFactory in the Boss and Worker pools are never closed.
> This causes more threads to be started for each attempt at sending a log.
> The code in question from NettyAvroRPCClient.java:
> {code}
> transceiver = new NettyTransceiver(this.address,
> new NioClientSocketChannelFactory(
> Executors.newCachedThreadPool(new TransceiverThreadFactory(
> "Avro " + NettyTransceiver.class.getSimpleName() + " Boss")),
> Executors.newCachedThreadPool(new TransceiverThreadFactory(
> "Avro " + NettyTransceiver.class.getSimpleName() + " I/O
> Worker"))),
> tu.toMillis(timeout));
> {code}
> Because all of the parameters are instantiated inline, there is no reference
> from which to clean them up if the Constructor throws an exception.
> I am proposing we explicitly instantiate these variables so that, if the
> constructor of NettyTransciever throws an exception and returns null, then we
> can clean up the threads that were spawned.
> something along the lines of:
> {code}
> callTimeoutPool = Executors.newCachedThreadPool(
> new TransceiverThreadFactory("Flume Avro RPC Client Call Invoker"));
> NioClientSocketChannelFactory factory;// move declaration out so that it
> is accessible in catch
> try {
> factory = new NioClientSocketChannelFactory(
> Executors.newCachedThreadPool(new TransceiverThreadFactory(
> "Avro " + NettyTransceiver.class.getSimpleName() + " Boss")),
> Executors.newCachedThreadPool(new TransceiverThreadFactory(
> "Avro " + NettyTransceiver.class.getSimpleName() + " I/O
> Worker"))); //move instantiation out of constructor signature so taht it is
> accessible afer constructor call
> transceiver = new NettyTransceiver(this.address, factory,
> tu.toMillis(timeout));
> avroClient =
> SpecificRequestor.getClient(AvroSourceProtocol.Callback.class,
> transceiver);
> } catch (IOException ex) {
> /*Graceful shutdown (from
> http://docs.jboss.org/netty/3.2/api/org/jboss/netty/channel/ChannelFactory.html)
> To shut down a network application service which is managed by a
> factory. you should follow the following steps:
> close all channels created by the factory and their child channels
> usually using ChannelGroup.close(), and
> call releaseExternalResources().*/
> factory.releaseExternalResources();//
> throw new FlumeException(this + ": RPC connection error", ex);
> }
> setState(ConnState.READY);
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.1#6144)