Repository: flume Updated Branches: refs/heads/trunk b54f085ff -> 964bcf56a
Facelift AvroSource and test using lambdas - Make avro ip filter tests more reliable by checking whether the caught exception is really what the test expected - Use lambda instead of anonymous classes to make the code shorter This closes #143. Reviewers: Denes Arvay (Attila Simon via Denes Arvay) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/964bcf56 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/964bcf56 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/964bcf56 Branch: refs/heads/trunk Commit: 964bcf56a54d38595f0ec74c484aa40d39732d9d Parents: b54f085 Author: Attila Simon <[email protected]> Authored: Thu Jun 29 09:21:56 2017 +0200 Committer: Denes Arvay <[email protected]> Committed: Tue Jul 4 13:20:59 2017 +0200 ---------------------------------------------------------------------- .../org/apache/flume/source/AvroSource.java | 18 ++++-------------- .../org/apache/flume/source/TestAvroSource.java | 20 ++++++++++++++++---- 2 files changed, 20 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/964bcf56/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java index e3467ec..623e61e 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java @@ -253,14 +253,9 @@ public class AvroSource extends AbstractSource implements EventDrivenSource, sourceCounter.start(); super.start(); final NettyServer srv = (NettyServer)server; - connectionCountUpdater.scheduleWithFixedDelay(new Runnable() { - - @Override - public void run() { - sourceCounter.setOpenConnectionCount( - Long.valueOf(srv.getNumActiveConnections())); - } - }, 0, 60, TimeUnit.SECONDS); + connectionCountUpdater.scheduleWithFixedDelay( + () -> sourceCounter.setOpenConnectionCount(Long.valueOf(srv.getNumActiveConnections())), + 0, 60, TimeUnit.SECONDS); logger.info("Avro source {} started.", getName()); } @@ -292,12 +287,7 @@ public class AvroSource extends AbstractSource implements EventDrivenSource, keystorePassword, keystoreType, enableIpFilter, patternRuleConfigDefinition); } else { - pipelineFactory = new ChannelPipelineFactory() { - @Override - public ChannelPipeline getPipeline() throws Exception { - return Channels.pipeline(); - } - }; + pipelineFactory = Channels::pipeline; } return pipelineFactory; } http://git-wip-us.apache.org/repos/asf/flume/blob/964bcf56/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java index 77fcb22..e7e2fab 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java +++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java @@ -30,6 +30,8 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.concurrent.Executors; +import java.util.function.Consumer; + import javax.net.ssl.SSLContext; import javax.net.ssl.SSLEngine; import javax.net.ssl.TrustManager; @@ -479,11 +481,20 @@ public class TestAvroSource { doIpFilterTest(localhost, "deny:ip:" + localhost.getHostAddress().substring(0, 3) + "*,allow:ip:*", false, false); + + // Private lambda expression to check the received FlumeException within this test + Consumer<Exception> exceptionChecker = (Exception ex) -> { + logger.info("Received an expected exception", ex); + //covers all ipFilter related exceptions + Assert.assertTrue("Expected an ipFilterRules related exception", + ex.getMessage().contains("ipFilter")); + }; + try { doIpFilterTest(localhost, null, false, false); Assert.fail("The null ipFilterRules config should have thrown an exception."); } catch (FlumeException e) { - //Do nothing + exceptionChecker.accept(e); } try { @@ -491,20 +502,21 @@ public class TestAvroSource { Assert.fail("The empty string ipFilterRules config should have thrown " + "an exception"); } catch (FlumeException e) { - //Do nothing + exceptionChecker.accept(e); } try { doIpFilterTest(localhost, "homer:ip:45.4.23.1", true, false); Assert.fail("Bad ipFilterRules config should have thrown an exception."); } catch (FlumeException e) { - //Do nothing + exceptionChecker.accept(e); } + try { doIpFilterTest(localhost, "allow:sleeps:45.4.23.1", true, false); Assert.fail("Bad ipFilterRules config should have thrown an exception."); } catch (FlumeException e) { - //Do nothing + exceptionChecker.accept(e); } }
