Repository: flume
Updated Branches:
  refs/heads/trunk b5e5ba50f -> 5e9cfef2b


FLUME-2905. Fix NetcatSource file descriptor leak if startup fails

This patch fixes the issue in NetcatSource which occurs if there is a problem
while binding the channel's socket to a local address and leads to a file 
descriptor
(socket) leak.

Reviewers: Attila Simon, Denes Arvay

(Siddharth Ahuja via Denes Arvay)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/5e9cfef2
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/5e9cfef2
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/5e9cfef2

Branch: refs/heads/trunk
Commit: 5e9cfef2b26f1960601d08d571e4c85c269503af
Parents: b5e5ba5
Author: Siddharth Ahuja <[email protected]>
Authored: Fri Jun 30 14:01:15 2017 +0200
Committer: Denes Arvay <[email protected]>
Committed: Fri Jun 30 14:01:15 2017 +0200

----------------------------------------------------------------------
 .../org/apache/flume/source/NetcatSource.java   |  7 ++--
 .../apache/flume/source/TestNetcatSource.java   | 34 ++++++++++++++++++++
 2 files changed, 38 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/5e9cfef2/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java 
b/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java
index 9513902..67e7e48 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java
@@ -156,9 +156,6 @@ public class NetcatSource extends AbstractSource implements 
Configurable,
 
     counterGroup.incrementAndGet("open.attempts");
 
-    handlerService = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
-        .setNameFormat("netcat-handler-%d").build());
-
     try {
       SocketAddress bindPoint = new InetSocketAddress(hostName, port);
 
@@ -170,9 +167,13 @@ public class NetcatSource extends AbstractSource 
implements Configurable,
     } catch (IOException e) {
       counterGroup.incrementAndGet("open.errors");
       logger.error("Unable to bind to socket. Exception follows.", e);
+      stop();
       throw new FlumeException(e);
     }
 
+    handlerService = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
+        .setNameFormat("netcat-handler-%d").build());
+
     AcceptHandler acceptRunnable = new AcceptHandler(maxLineLength);
     acceptThreadShouldStop.set(false);
     acceptRunnable.counterGroup = counterGroup;

http://git-wip-us.apache.org/repos/asf/flume/blob/5e9cfef2/flume-ng-core/src/test/java/org/apache/flume/source/TestNetcatSource.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/test/java/org/apache/flume/source/TestNetcatSource.java 
b/flume-ng-core/src/test/java/org/apache/flume/source/TestNetcatSource.java
index 99d413a..c1205c7 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestNetcatSource.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestNetcatSource.java
@@ -25,6 +25,7 @@ import org.apache.flume.Channel;
 import org.apache.flume.ChannelSelector;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
+import org.apache.flume.FlumeException;
 import org.apache.flume.Transaction;
 import org.apache.flume.channel.ChannelProcessor;
 import org.apache.flume.channel.MemoryChannel;
@@ -42,8 +43,10 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.net.InetAddress;
+import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.UnknownHostException;
+import java.nio.channels.ServerSocketChannel;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.List;
@@ -305,6 +308,37 @@ public class TestNetcatSource {
     }
   }
 
+  /**
+   * Tests that the source is stopped when an exception is thrown
+   * on port bind attempt due to port already being in use.
+   *
+   * @throws InterruptedException
+   */
+  @Test
+  public void testSourceStoppedOnFlumeException() throws InterruptedException, 
IOException {
+    boolean isFlumeExceptionThrown = false;
+    // create a dummy socket bound to a known port.
+    try (ServerSocketChannel dummyServerSocket = ServerSocketChannel.open()) {
+      dummyServerSocket.socket().setReuseAddress(true);
+      dummyServerSocket.socket().bind(new InetSocketAddress("0.0.0.0", 10500));
+
+      Context context = new Context();
+      context.put("port", String.valueOf(10500));
+      context.put("bind", "0.0.0.0");
+      context.put("ack-every-event", "false");
+      Configurables.configure(source, context);
+
+      source.start();
+    } catch (FlumeException fe) {
+      isFlumeExceptionThrown = true;
+    }
+    // As port is already in use, an exception is thrown and the source is 
stopped
+    // cleaning up the opened sockets during source.start().
+    Assert.assertTrue("Flume exception is thrown as port already in use", 
isFlumeExceptionThrown);
+    Assert.assertEquals("Server is stopped", LifecycleState.STOP,
+        source.getLifecycleState());
+  }
+
   private void startSource(String encoding, String ack, String batchSize, 
String maxLineLength)
       throws InterruptedException {
     boolean bound = false;

Reply via email to