FLUME-1713. Netcat source should allow for *not* returning "OK" upon receipt of 
each message.

(Rahul Ravindran via Mike Percy)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/48085ced
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/48085ced
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/48085ced

Branch: refs/heads/flume-1.3.0
Commit: 48085cedb0719a1dbf05c5d68d62cc8f11064b3c
Parents: 54330dc
Author: Mike Percy <[email protected]>
Authored: Thu Nov 29 00:01:43 2012 -0800
Committer: Brock Noland <[email protected]>
Committed: Thu Nov 29 10:01:31 2012 -0600

----------------------------------------------------------------------
 .../java/org/apache/flume/source/NetcatSource.java |   11 ++++++-
 .../source/NetcatSourceConfigurationConstants.java |    6 +++
 flume-ng-doc/sphinx/FlumeUserGuide.rst             |    1 +
 .../org/apache/flume/source/TestNetcatSource.java  |   26 +++++++++++++-
 4 files changed, 41 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/48085ced/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java 
b/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java
index 37c09fe..2da38bb 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java
@@ -112,6 +112,7 @@ public class NetcatSource extends AbstractSource implements 
Configurable,
   private String hostName;
   private int port;
   private int maxLineLength;
+  private boolean ackEveryEvent;
 
   private CounterGroup counterGroup;
   private ServerSocketChannel serverSocket;
@@ -131,11 +132,13 @@ public class NetcatSource extends AbstractSource 
implements Configurable,
   public void configure(Context context) {
     String hostKey = NetcatSourceConfigurationConstants.CONFIG_HOSTNAME;
     String portKey = NetcatSourceConfigurationConstants.CONFIG_PORT;
+    String ackEventKey = NetcatSourceConfigurationConstants.CONFIG_ACKEVENT;
 
     Configurables.ensureRequiredNonNull(context, hostKey, portKey);
 
     hostName = context.getString(hostKey);
     port = context.getInteger(portKey);
+    ackEveryEvent = context.getBoolean(ackEventKey, true);
     maxLineLength = context.getInteger(
         NetcatSourceConfigurationConstants.CONFIG_MAX_LINE_LENGTH,
         NetcatSourceConfigurationConstants.DEFAULT_MAX_LINE_LENGTH);
@@ -170,6 +173,7 @@ public class NetcatSource extends AbstractSource implements 
Configurable,
     acceptRunnable.counterGroup = counterGroup;
     acceptRunnable.handlerService = handlerService;
     acceptRunnable.shouldStop = acceptThreadShouldStop;
+    acceptRunnable.ackEveryEvent = ackEveryEvent;
     acceptRunnable.source = this;
     acceptRunnable.serverSocket = serverSocket;
 
@@ -246,6 +250,7 @@ public class NetcatSource extends AbstractSource implements 
Configurable,
     private ExecutorService handlerService;
     private EventDrivenSource source;
     private AtomicBoolean shouldStop;
+    private boolean ackEveryEvent;
 
     private final int maxLineLength;
 
@@ -266,6 +271,7 @@ public class NetcatSource extends AbstractSource implements 
Configurable,
           request.socketChannel = socketChannel;
           request.counterGroup = counterGroup;
           request.source = source;
+          request.ackEveryEvent = ackEveryEvent;
 
           handlerService.submit(request);
 
@@ -287,6 +293,7 @@ public class NetcatSource extends AbstractSource implements 
Configurable,
     private Source source;
     private CounterGroup counterGroup;
     private SocketChannel socketChannel;
+    private boolean ackEveryEvent;
 
     private final int maxLineLength;
 
@@ -392,7 +399,9 @@ public class NetcatSource extends AbstractSource implements 
Configurable,
             if (ex == null) {
               counterGroup.incrementAndGet("events.processed");
               numProcessed++;
-              writer.write("OK\n");
+              if (true == ackEveryEvent) {
+                writer.write("OK\n");
+              }
             } else {
               counterGroup.incrementAndGet("events.failed");
               logger.warn("Error processing event. Exception follows.", ex);

http://git-wip-us.apache.org/repos/asf/flume/blob/48085ced/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSourceConfigurationConstants.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSourceConfigurationConstants.java
 
b/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSourceConfigurationConstants.java
index 1d8b5e4..fdf318a 100644
--- 
a/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSourceConfigurationConstants.java
+++ 
b/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSourceConfigurationConstants.java
@@ -30,6 +30,12 @@ public class NetcatSourceConfigurationConstants {
    */
   public static final String CONFIG_PORT = "port";
 
+
+  /**
+   * Ack every event received with an "OK" back to the sender
+   */
+  public static final String CONFIG_ACKEVENT = "ack-every-event";
+
   /**
    * Maximum line length per event.
    */

http://git-wip-us.apache.org/repos/asf/flume/blob/48085ced/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst 
b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index e56aaf9..464defe 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -778,6 +778,7 @@ Property Name    Default      Description
 **bind**         --           Host name or IP address to bind to
 **port**         --           Port # to bind to
 max-line-length  512          Max line length per event body (in bytes)
+ack-every-event  true         Respond with an "OK" for every event received
 selector.type    replicating  replicating or multiplexing
 selector.*                    Depends on the selector.type value
 interceptors     --           Space separated list of interceptors

http://git-wip-us.apache.org/repos/asf/flume/blob/48085ced/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java 
b/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java
index 3c17d3d..91fbf63 100644
--- a/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java
+++ b/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java
@@ -29,6 +29,8 @@ import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.Collection;
+import java.util.Arrays;
 
 import com.google.common.collect.Lists;
 import org.apache.flume.Channel;
@@ -47,17 +49,32 @@ import org.apache.flume.lifecycle.LifecycleException;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import org.junit.runner.RunWith;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+@RunWith(value = Parameterized.class)
 public class TestNetcatSource {
 
   private Channel channel;
   private EventDrivenSource source;
+  private boolean ackEveryEvent;
 
   private static final Logger logger =
       LoggerFactory.getLogger(TestNetcatSource.class);
 
+  public TestNetcatSource(boolean ackForEveryEvent) {
+    ackEveryEvent = ackForEveryEvent;
+  }
+
+  @Parameters
+  public static Collection data() {
+    Object[][] data = new Object[][] { { true }, { false } };
+   return Arrays.asList(data);
+  }
+
   @Before
   public void setUp() {
     logger.info("Running setup");
@@ -87,6 +104,7 @@ public class TestNetcatSource {
         Context context = new Context();
         context.put("bind", "0.0.0.0");
         context.put("port", "41414");
+        context.put("ack-every-event", String.valueOf(ackEveryEvent));
 
         Configurables.configure(source, context);
 
@@ -112,8 +130,12 @@ public class TestNetcatSource {
           writer.write("Test message\n");
           writer.flush();
 
-          String response = reader.readLine();
-          Assert.assertEquals("Server should return OK", "OK", response);
+          if (ackEveryEvent) {
+                String response = reader.readLine();
+               Assert.assertEquals("Server should return OK", "OK", response);
+          } else {
+                Assert.assertFalse("Server should not return anything", 
reader.ready());
+          }
           clientChannel.close();
         } catch (IOException e) {
           logger.error("Caught exception: ", e);

Reply via email to