matt fisher created FLUME-2221:
----------------------------------

             Summary: NIO Worker threads remain open when connection fails
                 Key: FLUME-2221
                 URL: https://issues.apache.org/jira/browse/FLUME-2221
             Project: Flume
          Issue Type: Bug
          Components: Client SDK
    Affects Versions: v1.4.0
         Environment: Flume 1.4.0
Java 1.7.0_10
            Reporter: matt fisher


When LoadBalancingLog4JAppender loses its connection to Flume, upon subsequent 
attempts to log, it will spawn NIO threads and leave them open.  This is coming 
from the NettyAcroRPCClient's attempt to instantiate a NettyTransciever.  When 
the Transciever fails to connect in the constructor, an exception is thrown, 
but the NIO threads that were spawned by the NioClientSocketChannelFactory in 
the Boss and Worker pools are never closed.  This causes more threads to be 
started for each attempt at sending a log.  

The code in question from NettyAvroRPCClient.java:
{code}
transceiver = new NettyTransceiver(this.address,
          new NioClientSocketChannelFactory(
        Executors.newCachedThreadPool(new TransceiverThreadFactory(
            "Avro " + NettyTransceiver.class.getSimpleName() + " Boss")),
        Executors.newCachedThreadPool(new TransceiverThreadFactory(
            "Avro " + NettyTransceiver.class.getSimpleName() + " I/O Worker"))),
          tu.toMillis(timeout));
{code}

Because all of the parameters are instantiated inline, there is no reference 
from which to clean them up if the Constructor throws an exception.

I am proposing we explicitly instantiate these variables so that, if the 
constructor of NettyTransciever throws an exception and returns null, then we 
can clean up the threads that were spawned.

something along the lines of: 
{code}
  callTimeoutPool = Executors.newCachedThreadPool(
        new TransceiverThreadFactory("Flume Avro RPC Client Call Invoker"));
    NioClientSocketChannelFactory factory;// move declaration out so that it is 
accessible in catch
    try {
      factory = new NioClientSocketChannelFactory(
        Executors.newCachedThreadPool(new TransceiverThreadFactory(
            "Avro " + NettyTransceiver.class.getSimpleName() + " Boss")),
        Executors.newCachedThreadPool(new TransceiverThreadFactory(
            "Avro " + NettyTransceiver.class.getSimpleName() + " I/O 
Worker")));  //move instantiation out of constructor signature so taht it is 
accessible afer constructor call
      transceiver = new NettyTransceiver(this.address, factory, 
tu.toMillis(timeout));
      avroClient =
          SpecificRequestor.getClient(AvroSourceProtocol.Callback.class,
          transceiver);
    } catch (IOException ex) {
      /*Graceful shutdown (from 
http://docs.jboss.org/netty/3.2/api/org/jboss/netty/channel/ChannelFactory.html)

        To shut down a network application service which is managed by a 
factory. you should follow the following steps:
        close all channels created by the factory and their child channels 
usually using ChannelGroup.close(), and
        call releaseExternalResources().*/

      factory.releaseExternalResources();//
      throw new FlumeException(this + ": RPC connection error", ex);
    }
    setState(ConnState.READY);
  }

{code}



--
This message was sent by Atlassian JIRA
(v6.1#6144)

Reply via email to