FLUME-1713. Netcat source should allow for *not* returning "OK" upon receipt of each message.
(Rahul Ravindran via Mike Percy) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/48085ced Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/48085ced Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/48085ced Branch: refs/heads/flume-1.3.0 Commit: 48085cedb0719a1dbf05c5d68d62cc8f11064b3c Parents: 54330dc Author: Mike Percy <[email protected]> Authored: Thu Nov 29 00:01:43 2012 -0800 Committer: Brock Noland <[email protected]> Committed: Thu Nov 29 10:01:31 2012 -0600 ---------------------------------------------------------------------- .../java/org/apache/flume/source/NetcatSource.java | 11 ++++++- .../source/NetcatSourceConfigurationConstants.java | 6 +++ flume-ng-doc/sphinx/FlumeUserGuide.rst | 1 + .../org/apache/flume/source/TestNetcatSource.java | 26 +++++++++++++- 4 files changed, 41 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/48085ced/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java index 37c09fe..2da38bb 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java @@ -112,6 +112,7 @@ public class NetcatSource extends AbstractSource implements Configurable, private String hostName; private int port; private int maxLineLength; + private boolean ackEveryEvent; private CounterGroup counterGroup; private ServerSocketChannel serverSocket; @@ -131,11 +132,13 @@ public class NetcatSource extends AbstractSource implements Configurable, public void configure(Context context) { String hostKey = NetcatSourceConfigurationConstants.CONFIG_HOSTNAME; String portKey = NetcatSourceConfigurationConstants.CONFIG_PORT; + String ackEventKey = NetcatSourceConfigurationConstants.CONFIG_ACKEVENT; Configurables.ensureRequiredNonNull(context, hostKey, portKey); hostName = context.getString(hostKey); port = context.getInteger(portKey); + ackEveryEvent = context.getBoolean(ackEventKey, true); maxLineLength = context.getInteger( NetcatSourceConfigurationConstants.CONFIG_MAX_LINE_LENGTH, NetcatSourceConfigurationConstants.DEFAULT_MAX_LINE_LENGTH); @@ -170,6 +173,7 @@ public class NetcatSource extends AbstractSource implements Configurable, acceptRunnable.counterGroup = counterGroup; acceptRunnable.handlerService = handlerService; acceptRunnable.shouldStop = acceptThreadShouldStop; + acceptRunnable.ackEveryEvent = ackEveryEvent; acceptRunnable.source = this; acceptRunnable.serverSocket = serverSocket; @@ -246,6 +250,7 @@ public class NetcatSource extends AbstractSource implements Configurable, private ExecutorService handlerService; private EventDrivenSource source; private AtomicBoolean shouldStop; + private boolean ackEveryEvent; private final int maxLineLength; @@ -266,6 +271,7 @@ public class NetcatSource extends AbstractSource implements Configurable, request.socketChannel = socketChannel; request.counterGroup = counterGroup; request.source = source; + request.ackEveryEvent = ackEveryEvent; handlerService.submit(request); @@ -287,6 +293,7 @@ public class NetcatSource extends AbstractSource implements Configurable, private Source source; private CounterGroup counterGroup; private SocketChannel socketChannel; + private boolean ackEveryEvent; private final int maxLineLength; @@ -392,7 +399,9 @@ public class NetcatSource extends AbstractSource implements Configurable, if (ex == null) { counterGroup.incrementAndGet("events.processed"); numProcessed++; - writer.write("OK\n"); + if (true == ackEveryEvent) { + writer.write("OK\n"); + } } else { counterGroup.incrementAndGet("events.failed"); logger.warn("Error processing event. Exception follows.", ex); http://git-wip-us.apache.org/repos/asf/flume/blob/48085ced/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSourceConfigurationConstants.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSourceConfigurationConstants.java b/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSourceConfigurationConstants.java index 1d8b5e4..fdf318a 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSourceConfigurationConstants.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSourceConfigurationConstants.java @@ -30,6 +30,12 @@ public class NetcatSourceConfigurationConstants { */ public static final String CONFIG_PORT = "port"; + + /** + * Ack every event received with an "OK" back to the sender + */ + public static final String CONFIG_ACKEVENT = "ack-every-event"; + /** * Maximum line length per event. */ http://git-wip-us.apache.org/repos/asf/flume/blob/48085ced/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index e56aaf9..464defe 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -778,6 +778,7 @@ Property Name Default Description **bind** -- Host name or IP address to bind to **port** -- Port # to bind to max-line-length 512 Max line length per event body (in bytes) +ack-every-event true Respond with an "OK" for every event received selector.type replicating replicating or multiplexing selector.* Depends on the selector.type value interceptors -- Space separated list of interceptors http://git-wip-us.apache.org/repos/asf/flume/blob/48085ced/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java b/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java index 3c17d3d..91fbf63 100644 --- a/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java +++ b/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java @@ -29,6 +29,8 @@ import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.Collection; +import java.util.Arrays; import com.google.common.collect.Lists; import org.apache.flume.Channel; @@ -47,17 +49,32 @@ import org.apache.flume.lifecycle.LifecycleException; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; +import org.junit.runner.RunWith; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +@RunWith(value = Parameterized.class) public class TestNetcatSource { private Channel channel; private EventDrivenSource source; + private boolean ackEveryEvent; private static final Logger logger = LoggerFactory.getLogger(TestNetcatSource.class); + public TestNetcatSource(boolean ackForEveryEvent) { + ackEveryEvent = ackForEveryEvent; + } + + @Parameters + public static Collection data() { + Object[][] data = new Object[][] { { true }, { false } }; + return Arrays.asList(data); + } + @Before public void setUp() { logger.info("Running setup"); @@ -87,6 +104,7 @@ public class TestNetcatSource { Context context = new Context(); context.put("bind", "0.0.0.0"); context.put("port", "41414"); + context.put("ack-every-event", String.valueOf(ackEveryEvent)); Configurables.configure(source, context); @@ -112,8 +130,12 @@ public class TestNetcatSource { writer.write("Test message\n"); writer.flush(); - String response = reader.readLine(); - Assert.assertEquals("Server should return OK", "OK", response); + if (ackEveryEvent) { + String response = reader.readLine(); + Assert.assertEquals("Server should return OK", "OK", response); + } else { + Assert.assertFalse("Server should not return anything", reader.ready()); + } clientChannel.close(); } catch (IOException e) { logger.error("Caught exception: ", e);
