Repository: flume Updated Branches: refs/heads/trunk d01dfd321 -> b5e5ba50f
FLUME-2752. Fix AvroSource startup resource leaks Cleanup after Netty initialisation fails (call this.stop()) - Make sure this.stop() releases the resources and end up the component in a LifecycleAware.STOPPED state - Added junit test to cover the invalid host scenario - Added junit test to cover the used port scenario This closes #141. Reviewers: Denes Arvay (Attila Simon via Denes Arvay) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/b5e5ba50 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/b5e5ba50 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/b5e5ba50 Branch: refs/heads/trunk Commit: b5e5ba50f4333272b9e2f2be2b32027e667f32e2 Parents: d01dfd3 Author: Attila Simon <[email protected]> Authored: Thu Jun 29 08:21:44 2017 +0200 Committer: Denes Arvay <[email protected]> Committed: Fri Jun 30 13:29:35 2017 +0200 ---------------------------------------------------------------------- .../org/apache/flume/source/AvroSource.java | 55 +++--- .../org/apache/flume/source/TestAvroSource.java | 168 ++++++++++++------- 2 files changed, 140 insertions(+), 83 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/b5e5ba50/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java index 762f690..e3467ec 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java @@ -20,7 +20,6 @@ package org.apache.flume.source; import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.avro.ipc.NettyServer; import org.apache.avro.ipc.NettyTransceiver; @@ -156,6 +155,7 @@ public class AvroSource extends AbstractSource implements EventDrivenSource, private boolean enableIpFilter; private String patternRuleConfigDefinition; + private NioServerSocketChannelFactory socketChannelFactory; private Server server; private SourceCounter sourceCounter; @@ -233,14 +233,20 @@ public class AvroSource extends AbstractSource implements EventDrivenSource, public void start() { logger.info("Starting {}...", this); - Responder responder = new SpecificResponder(AvroSourceProtocol.class, this); + try { + Responder responder = new SpecificResponder(AvroSourceProtocol.class, this); - NioServerSocketChannelFactory socketChannelFactory = initSocketChannelFactory(); + socketChannelFactory = initSocketChannelFactory(); - ChannelPipelineFactory pipelineFactory = initChannelPipelineFactory(); + ChannelPipelineFactory pipelineFactory = initChannelPipelineFactory(); - server = new NettyServer(responder, new InetSocketAddress(bindAddress, port), - socketChannelFactory, pipelineFactory, null); + server = new NettyServer(responder, new InetSocketAddress(bindAddress, port), + socketChannelFactory, pipelineFactory, null); + } catch (org.jboss.netty.channel.ChannelException nce) { + logger.error("Avro source {} startup failed. Cannot initialize Netty server", getName(), nce); + stop(); + throw new FlumeException("Failed to set up server socket", nce); + } connectionCountUpdater = Executors.newSingleThreadScheduledExecutor(); server.start(); @@ -300,28 +306,31 @@ public class AvroSource extends AbstractSource implements EventDrivenSource, public void stop() { logger.info("Avro source {} stopping: {}", getName(), this); - server.close(); + if (server != null) { + server.close(); + try { + server.join(); + server = null; + } catch (InterruptedException e) { + logger.info("Avro source " + getName() + ": Interrupted while waiting " + + "for Avro server to stop. Exiting. Exception follows.", e); + Thread.currentThread().interrupt(); + } + } - try { - server.join(); - } catch (InterruptedException e) { - logger.info("Avro source " + getName() + ": Interrupted while waiting " + - "for Avro server to stop. Exiting. Exception follows.", e); + if (socketChannelFactory != null) { + socketChannelFactory.releaseExternalResources(); + socketChannelFactory = null; } + sourceCounter.stop(); - connectionCountUpdater.shutdown(); - while (!connectionCountUpdater.isTerminated()) { - try { - Thread.sleep(100); - } catch (InterruptedException ex) { - logger.error("Interrupted while waiting for connection count executor " - + "to terminate", ex); - Throwables.propagate(ex); - } + if (connectionCountUpdater != null) { + connectionCountUpdater.shutdownNow(); + connectionCountUpdater = null; } + super.stop(); - logger.info("Avro source {} stopped. Metrics: {}", getName(), - sourceCounter); + logger.info("Avro source {} stopped. Metrics: {}", getName(), sourceCounter); } @Override http://git-wip-us.apache.org/repos/asf/flume/blob/b5e5ba50/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java index d73e5ad..77fcb22 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java +++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java @@ -20,12 +20,12 @@ package org.apache.flume.source; import java.io.IOException; -import java.net.Inet4Address; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.security.cert.X509Certificate; +import java.nio.channels.ServerSocketChannel; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -52,7 +52,6 @@ import org.apache.flume.lifecycle.LifecycleState; import org.apache.flume.source.avro.AvroFlumeEvent; import org.apache.flume.source.avro.AvroSourceProtocol; import org.apache.flume.source.avro.Status; -import org.jboss.netty.channel.ChannelException; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.socket.SocketChannel; import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; @@ -97,20 +96,19 @@ public class TestAvroSource { boolean bound = false; for (int i = 0; i < 100 && !bound; i++) { - try { - Context context = new Context(); - - context.put("port", String.valueOf(selectedPort = 41414 + i)); - context.put("bind", "0.0.0.0"); - Configurables.configure(source, context); + Context context = new Context(); + context.put("port", String.valueOf(selectedPort = 41414 + i)); + context.put("bind", "0.0.0.0"); + // Invalid configuration may throw a FlumeException which has to be expected in the callers + Configurables.configure(source, context); + try { source.start(); bound = true; - } catch (ChannelException e) { + } catch (FlumeException e) { /* - * NB: This assume we're using the Netty server under the hood and the - * failure is to bind. Yucky. + * NB: This assume the failure is to bind. */ } } @@ -129,6 +127,62 @@ public class TestAvroSource { } @Test + public void testSourceStoppedOnFlumeExceptionIfPortUsed() + throws InterruptedException, IOException { + final String loopbackIPv4 = "127.0.0.1"; + final int port = 10500; + + // create a dummy socket bound to a known port. + try (ServerSocketChannel dummyServerSocket = ServerSocketChannel.open()) { + dummyServerSocket.socket().setReuseAddress(true); + dummyServerSocket.socket().bind(new InetSocketAddress(loopbackIPv4, port)); + + Context context = new Context(); + context.put("port", String.valueOf(port)); + context.put("bind", loopbackIPv4); + Configurables.configure(source, context); + try { + source.start(); + Assert.fail("Expected an exception during startup caused by binding on a used port"); + } catch (FlumeException e) { + logger.info("Received an expected exception.", e); + Assert.assertTrue("Expected a server socket setup related root cause", + e.getMessage().contains("server socket")); + } + } + // As port is already in use, an exception is thrown and the source is stopped + // cleaning up the opened sockets during source.start(). + Assert.assertEquals("Server is stopped", LifecycleState.STOP, + source.getLifecycleState()); + } + + @Test + public void testInvalidAddress() + throws InterruptedException, IOException { + final String invalidHost = "invalid.host"; + final int port = 10501; + + Context context = new Context(); + context.put("port", String.valueOf(port)); + context.put("bind", invalidHost); + Configurables.configure(source, context); + + try { + source.start(); + Assert.fail("Expected an exception during startup caused by binding on a invalid host"); + } catch (FlumeException e) { + logger.info("Received an expected exception.", e); + Assert.assertTrue("Expected a server socket setup related root cause", + e.getMessage().contains("server socket")); + } + + // As port is already in use, an exception is thrown and the source is stopped + // cleaning up the opened sockets during source.start(). + Assert.assertEquals("Server is stopped", LifecycleState.STOP, + source.getLifecycleState()); + } + + @Test public void testRequestWithNoCompression() throws InterruptedException, IOException { doRequest(false, false, 6); @@ -179,25 +233,22 @@ public class TestAvroSource { boolean bound = false; for (int i = 0; i < 100 && !bound; i++) { + Context context = new Context(); + context.put("port", String.valueOf(selectedPort = 41414 + i)); + context.put("bind", "0.0.0.0"); + context.put("threads", "50"); + if (serverEnableCompression) { + context.put("compression-type", "deflate"); + } else { + context.put("compression-type", "none"); + } + Configurables.configure(source, context); try { - Context context = new Context(); - context.put("port", String.valueOf(selectedPort = 41414 + i)); - context.put("bind", "0.0.0.0"); - context.put("threads", "50"); - if (serverEnableCompression) { - context.put("compression-type", "deflate"); - } else { - context.put("compression-type", "none"); - } - - Configurables.configure(source, context); - source.start(); bound = true; - } catch (ChannelException e) { + } catch (FlumeException e) { /* - * NB: This assume we're using the Netty server under the hood and the - * failure is to bind. Yucky. + * NB: This assume the failure is to bind. */ } } @@ -282,24 +333,21 @@ public class TestAvroSource { boolean bound = false; for (int i = 0; i < 10 && !bound; i++) { + Context context = new Context(); + + context.put("port", String.valueOf(selectedPort = 41414 + i)); + context.put("bind", "0.0.0.0"); + context.put("ssl", "true"); + context.put("keystore", "src/test/resources/server.p12"); + context.put("keystore-password", "password"); + context.put("keystore-type", "PKCS12"); + Configurables.configure(source, context); try { - Context context = new Context(); - - context.put("port", String.valueOf(selectedPort = 41414 + i)); - context.put("bind", "0.0.0.0"); - context.put("ssl", "true"); - context.put("keystore", "src/test/resources/server.p12"); - context.put("keystore-password", "password"); - context.put("keystore-type", "PKCS12"); - - Configurables.configure(source, context); - source.start(); bound = true; - } catch (ChannelException e) { + } catch (FlumeException e) { /* - * NB: This assume we're using the Netty server under the hood and the - * failure is to bind. Yucky. + * NB: This assume the failure is to bind. */ Thread.sleep(100); } @@ -466,30 +514,30 @@ public class TestAvroSource { boolean bound = false; for (int i = 0; i < 100 && !bound; i++) { - try { - Context context = new Context(); - context.put("port", String.valueOf(selectedPort = 41414 + i)); - context.put("bind", "0.0.0.0"); - context.put("ipFilter", "true"); - if (ruleDefinition != null) { - context.put("ipFilterRules", ruleDefinition); - } - if (testWithSSL) { - logger.info("Client testWithSSL" + testWithSSL); - context.put("ssl", "true"); - context.put("keystore", "src/test/resources/server.p12"); - context.put("keystore-password", "password"); - context.put("keystore-type", "PKCS12"); - } - - Configurables.configure(source, context); + Context context = new Context(); + context.put("port", String.valueOf(selectedPort = 41414 + i)); + context.put("bind", "0.0.0.0"); + context.put("ipFilter", "true"); + if (ruleDefinition != null) { + context.put("ipFilterRules", ruleDefinition); + } + if (testWithSSL) { + logger.info("Client testWithSSL" + testWithSSL); + context.put("ssl", "true"); + context.put("keystore", "src/test/resources/server.p12"); + context.put("keystore-password", "password"); + context.put("keystore-type", "PKCS12"); + } + // Invalid configuration may result in a FlumeException + Configurables.configure(source, context); + + try { source.start(); bound = true; - } catch (ChannelException e) { + } catch (FlumeException e) { /* - * NB: This assume we're using the Netty server under the hood and the - * failure is to bind. Yucky. + * NB: This assume the failure is to bind. */ Thread.sleep(100); }
