Repository: flume Updated Branches: refs/heads/trunk b5e5ba50f -> 5e9cfef2b
FLUME-2905. Fix NetcatSource file descriptor leak if startup fails This patch fixes the issue in NetcatSource which occurs if there is a problem while binding the channel's socket to a local address and leads to a file descriptor (socket) leak. Reviewers: Attila Simon, Denes Arvay (Siddharth Ahuja via Denes Arvay) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/5e9cfef2 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/5e9cfef2 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/5e9cfef2 Branch: refs/heads/trunk Commit: 5e9cfef2b26f1960601d08d571e4c85c269503af Parents: b5e5ba5 Author: Siddharth Ahuja <[email protected]> Authored: Fri Jun 30 14:01:15 2017 +0200 Committer: Denes Arvay <[email protected]> Committed: Fri Jun 30 14:01:15 2017 +0200 ---------------------------------------------------------------------- .../org/apache/flume/source/NetcatSource.java | 7 ++-- .../apache/flume/source/TestNetcatSource.java | 34 ++++++++++++++++++++ 2 files changed, 38 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/5e9cfef2/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java index 9513902..67e7e48 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java @@ -156,9 +156,6 @@ public class NetcatSource extends AbstractSource implements Configurable, counterGroup.incrementAndGet("open.attempts"); - handlerService = Executors.newCachedThreadPool(new ThreadFactoryBuilder() - .setNameFormat("netcat-handler-%d").build()); - try { SocketAddress bindPoint = new InetSocketAddress(hostName, port); @@ -170,9 +167,13 @@ public class NetcatSource extends AbstractSource implements Configurable, } catch (IOException e) { counterGroup.incrementAndGet("open.errors"); logger.error("Unable to bind to socket. Exception follows.", e); + stop(); throw new FlumeException(e); } + handlerService = Executors.newCachedThreadPool(new ThreadFactoryBuilder() + .setNameFormat("netcat-handler-%d").build()); + AcceptHandler acceptRunnable = new AcceptHandler(maxLineLength); acceptThreadShouldStop.set(false); acceptRunnable.counterGroup = counterGroup; http://git-wip-us.apache.org/repos/asf/flume/blob/5e9cfef2/flume-ng-core/src/test/java/org/apache/flume/source/TestNetcatSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestNetcatSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestNetcatSource.java index 99d413a..c1205c7 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/source/TestNetcatSource.java +++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestNetcatSource.java @@ -25,6 +25,7 @@ import org.apache.flume.Channel; import org.apache.flume.ChannelSelector; import org.apache.flume.Context; import org.apache.flume.Event; +import org.apache.flume.FlumeException; import org.apache.flume.Transaction; import org.apache.flume.channel.ChannelProcessor; import org.apache.flume.channel.MemoryChannel; @@ -42,8 +43,10 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.OutputStream; import java.net.InetAddress; +import java.net.InetSocketAddress; import java.net.Socket; import java.net.UnknownHostException; +import java.nio.channels.ServerSocketChannel; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.List; @@ -305,6 +308,37 @@ public class TestNetcatSource { } } + /** + * Tests that the source is stopped when an exception is thrown + * on port bind attempt due to port already being in use. + * + * @throws InterruptedException + */ + @Test + public void testSourceStoppedOnFlumeException() throws InterruptedException, IOException { + boolean isFlumeExceptionThrown = false; + // create a dummy socket bound to a known port. + try (ServerSocketChannel dummyServerSocket = ServerSocketChannel.open()) { + dummyServerSocket.socket().setReuseAddress(true); + dummyServerSocket.socket().bind(new InetSocketAddress("0.0.0.0", 10500)); + + Context context = new Context(); + context.put("port", String.valueOf(10500)); + context.put("bind", "0.0.0.0"); + context.put("ack-every-event", "false"); + Configurables.configure(source, context); + + source.start(); + } catch (FlumeException fe) { + isFlumeExceptionThrown = true; + } + // As port is already in use, an exception is thrown and the source is stopped + // cleaning up the opened sockets during source.start(). + Assert.assertTrue("Flume exception is thrown as port already in use", isFlumeExceptionThrown); + Assert.assertEquals("Server is stopped", LifecycleState.STOP, + source.getLifecycleState()); + } + private void startSource(String encoding, String ack, String batchSize, String maxLineLength) throws InterruptedException { boolean bound = false;
