Repository: flume
Updated Branches:
  refs/heads/trunk 5e9cfef2b -> ef54f9fd6


FLUME-2917. Provide netcat UDP source as alternative to TCP

This patch adds a netcat UDP source.

Reviewers: Lior Zeno, Chris Horrocks, Bessenyei Balázs Donát

(Tristan Stevens via Bessenyei Balázs Donát)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/ef54f9fd
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/ef54f9fd
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/ef54f9fd

Branch: refs/heads/trunk
Commit: ef54f9fd6372143adc8c6a66f174feba381fec95
Parents: 5e9cfef
Author: Tristan Stevens <[email protected]>
Authored: Sun Jul 2 21:31:27 2017 +0000
Committer: Bessenyei Balázs Donát <[email protected]>
Committed: Sun Jul 2 21:31:27 2017 +0000

----------------------------------------------------------------------
 .../flume/conf/source/SourceConfiguration.java  |   9 +-
 .../apache/flume/conf/source/SourceType.java    |   9 +-
 .../apache/flume/source/NetcatUdpSource.java    | 188 +++++++++++++++++++
 .../flume/source/TestDefaultSourceFactory.java  |   1 +
 .../flume/source/TestNetcatUdpSource.java       | 177 +++++++++++++++++
 flume-ng-doc/sphinx/FlumeUserGuide.rst          |  38 +++-
 6 files changed, 418 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/ef54f9fd/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceConfiguration.java
 
b/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceConfiguration.java
index 6bd1489..201d716 100644
--- 
a/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceConfiguration.java
+++ 
b/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceConfiguration.java
@@ -214,7 +214,14 @@ public class SourceConfiguration extends 
ComponentConfiguration {
      *
      * @see org.apache.flume.source.taildir.TaildirSource
      */
-    TAILDIR("org.apache.flume.source.taildir.TaildirSourceConfiguration")
+    TAILDIR("org.apache.flume.source.taildir.TaildirSourceConfiguration"),
+
+    /**
+     * Netcat UDP Source
+     *
+     * @see org.apache.flume.source.NetcatUdpSource
+     */
+    NETCATUDP("org.apache.flume.conf.source.NetcatUdpSourceConfiguration")
     ;
 
     private String srcConfigurationName;

http://git-wip-us.apache.org/repos/asf/flume/blob/ef54f9fd/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceType.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceType.java
 
b/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceType.java
index 4f4073a..3e7e7be 100644
--- 
a/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceType.java
+++ 
b/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceType.java
@@ -110,7 +110,14 @@ public enum SourceType {
    *
    * @see org.apache.flume.source.taildir.TaildirSource
    */
-  TAILDIR("org.apache.flume.source.taildir.TaildirSource")
+  TAILDIR("org.apache.flume.source.taildir.TaildirSource"),
+
+  /**
+   * Netcat UDP Source
+   *
+   * @see org.apache.flume.source.NetcatUdpSource
+   */
+  NETCATUDP("org.apache.flume.source.NetcatUdpSource")
   ;
 
   private final String sourceClassName;

http://git-wip-us.apache.org/repos/asf/flume/blob/ef54f9fd/flume-ng-core/src/main/java/org/apache/flume/source/NetcatUdpSource.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/source/NetcatUdpSource.java 
b/flume-ng-core/src/main/java/org/apache/flume/source/NetcatUdpSource.java
new file mode 100644
index 0000000..38198a6
--- /dev/null
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/NetcatUdpSource.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.source;
+
+import java.io.ByteArrayOutputStream;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.flume.ChannelException;
+import org.apache.flume.Context;
+import org.apache.flume.CounterGroup;
+import org.apache.flume.Event;
+import org.apache.flume.EventDrivenSource;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.event.EventBuilder;
+import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.AdaptiveReceiveBufferSizePredictorFactory;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelHandler;
+import org.jboss.netty.channel.socket.oio.OioDatagramChannelFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NetcatUdpSource extends AbstractSource
+      implements EventDrivenSource, Configurable {
+
+  private int port;
+  private int maxsize = 1 << 16; // 64k
+  private String host = null;
+  private Channel nettyChannel;
+  private String remoteHostHeader = "REMOTE_ADDRESS";
+
+  private static final Logger logger = LoggerFactory
+      .getLogger(NetcatUdpSource.class);
+
+  private CounterGroup counterGroup = new CounterGroup();
+
+  // Default Min size
+  private static final int DEFAULT_MIN_SIZE = 2048;
+  private static final int DEFAULT_INITIAL_SIZE = DEFAULT_MIN_SIZE;
+  private static final String REMOTE_ADDRESS_HEADER = "remoteAddress";
+  private static final String CONFIG_PORT = "port";
+  private static final String CONFIG_HOST = "bind";
+
+  public class NetcatHandler extends SimpleChannelHandler {
+
+
+    // extract line for building Flume event
+    private Event extractEvent(ChannelBuffer in, SocketAddress remoteAddress) {
+
+      Map<String, String> headers = new HashMap<String,String>();
+
+      headers.put(remoteHostHeader, remoteAddress.toString());
+
+      byte b = 0;
+      ByteArrayOutputStream baos = new ByteArrayOutputStream();
+      Event e = null;
+      boolean doneReading = false;
+
+      try {
+        while (!doneReading && in.readable()) {
+          b = in.readByte();
+          // Entries are separated by '\n'
+          if (b == '\n') {
+            doneReading = true;
+          } else {
+            baos.write(b);
+          }
+        }
+
+        e = EventBuilder.withBody(baos.toByteArray(), headers);
+      } finally {
+        // no-op
+      }
+
+      return e;
+    }
+
+    @Override
+    public void messageReceived(ChannelHandlerContext ctx, MessageEvent 
mEvent) {
+      try {
+        Event e = extractEvent((ChannelBuffer)mEvent.getMessage(), 
mEvent.getRemoteAddress());
+        if (e == null) {
+          return;
+        }
+        getChannelProcessor().processEvent(e);
+        counterGroup.incrementAndGet("events.success");
+      } catch (ChannelException ex) {
+        counterGroup.incrementAndGet("events.dropped");
+        logger.error("Error writing to channel", ex);
+      } catch (RuntimeException ex) {
+        counterGroup.incrementAndGet("events.dropped");
+        logger.error("Error retrieving event from udp stream, event dropped", 
ex);
+      }
+    }
+  }
+
+  @Override
+  public void start() {
+    // setup Netty server
+    ConnectionlessBootstrap serverBootstrap = new ConnectionlessBootstrap(
+        new OioDatagramChannelFactory(Executors.newCachedThreadPool()));
+    final NetcatHandler handler = new NetcatHandler();
+    serverBootstrap.setOption("receiveBufferSizePredictorFactory",
+        new AdaptiveReceiveBufferSizePredictorFactory(DEFAULT_MIN_SIZE,
+        DEFAULT_INITIAL_SIZE, maxsize));
+    serverBootstrap.setPipelineFactory(new ChannelPipelineFactory() {
+      @Override
+      public ChannelPipeline getPipeline() {
+        return Channels.pipeline(handler);
+      }
+    });
+
+    if (host == null) {
+      nettyChannel = serverBootstrap.bind(new InetSocketAddress(port));
+    } else {
+      nettyChannel = serverBootstrap.bind(new InetSocketAddress(host, port));
+    }
+
+    super.start();
+  }
+
+  @Override
+  public void stop() {
+    logger.info("Netcat UDP Source stopping...");
+    logger.info("Metrics:{}", counterGroup);
+    if (nettyChannel != null) {
+      nettyChannel.close();
+      try {
+        nettyChannel.getCloseFuture().await(60, TimeUnit.SECONDS);
+      } catch (InterruptedException e) {
+        logger.warn("netty server stop interrupted", e);
+      } finally {
+        nettyChannel = null;
+      }
+    }
+
+    super.stop();
+  }
+
+  @Override
+  public void configure(Context context) {
+    Configurables.ensureRequiredNonNull(
+        context, CONFIG_PORT);
+    port = context.getInteger(CONFIG_PORT);
+    host = context.getString(CONFIG_HOST);
+    remoteHostHeader = context.getString(REMOTE_ADDRESS_HEADER);
+  }
+
+  @VisibleForTesting
+  public int getSourcePort() {
+    SocketAddress localAddress = nettyChannel.getLocalAddress();
+    if (localAddress instanceof InetSocketAddress) {
+      InetSocketAddress addr = (InetSocketAddress) localAddress;
+      return addr.getPort();
+    }
+    return 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/ef54f9fd/flume-ng-core/src/test/java/org/apache/flume/source/TestDefaultSourceFactory.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/test/java/org/apache/flume/source/TestDefaultSourceFactory.java
 
b/flume-ng-core/src/test/java/org/apache/flume/source/TestDefaultSourceFactory.java
index baa8500..97e024f 100644
--- 
a/flume-ng-core/src/test/java/org/apache/flume/source/TestDefaultSourceFactory.java
+++ 
b/flume-ng-core/src/test/java/org/apache/flume/source/TestDefaultSourceFactory.java
@@ -66,6 +66,7 @@ public class TestDefaultSourceFactory {
   public void testSourceCreation() throws Exception {
     verifySourceCreation("seq-src", "seq", SequenceGeneratorSource.class);
     verifySourceCreation("netcat-src", "netcat", NetcatSource.class);
+    verifySourceCreation("netcat-udp-src", "netcatudp", NetcatUdpSource.class);
     verifySourceCreation("exec-src", "exec", ExecSource.class);
     verifySourceCreation("avro-src", "avro", AvroSource.class);
     verifySourceCreation("syslogtcp-src", "syslogtcp", SyslogTcpSource.class);

http://git-wip-us.apache.org/repos/asf/flume/blob/ef54f9fd/flume-ng-core/src/test/java/org/apache/flume/source/TestNetcatUdpSource.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/test/java/org/apache/flume/source/TestNetcatUdpSource.java 
b/flume-ng-core/src/test/java/org/apache/flume/source/TestNetcatUdpSource.java
new file mode 100644
index 0000000..03740e9
--- /dev/null
+++ 
b/flume-ng-core/src/test/java/org/apache/flume/source/TestNetcatUdpSource.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.source;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.InetAddress;
+import java.net.DatagramSocket;
+import com.google.common.base.Charsets;
+import org.apache.flume.Channel;
+import org.apache.flume.ChannelSelector;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.Transaction;
+import org.apache.flume.channel.ChannelProcessor;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.channel.ReplicatingChannelSelector;
+import org.apache.flume.conf.Configurables;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.LoggerFactory;
+
+public class TestNetcatUdpSource {
+  private static final org.slf4j.Logger logger = 
LoggerFactory.getLogger(TestNetcatUdpSource.class);
+  private NetcatUdpSource source;
+  private Channel channel;
+  private static final int TEST_NETCAT_PORT = 0;
+  private final String shortString = "Lorem ipsum dolor sit amet.";
+  private final String mediumString = "Lorem ipsum dolor sit amet, consectetur 
adipiscing elit. " +
+                                      "Nunc maximus rhoncus viverra. Nunc a 
metus.";
+
+  private void init() {
+    source = new NetcatUdpSource();
+    channel = new MemoryChannel();
+
+    Configurables.configure(channel, new Context());
+
+    List<Channel> channels = new ArrayList<Channel>();
+    channels.add(channel);
+
+    ChannelSelector rcs = new ReplicatingChannelSelector();
+    rcs.setChannels(channels);
+
+    source.setChannelProcessor(new ChannelProcessor(rcs));
+    Context context = new Context();
+    context.put("port", String.valueOf(TEST_NETCAT_PORT));
+
+    source.configure(context);
+
+  }
+
+  /** Tests the keepFields configuration parameter (enabled or disabled)
+   using SyslogUDPSource.*/
+
+  private void runUdpTest(String data1) throws IOException {
+    init();
+    source.start();
+    // Write some message to the port
+    DatagramSocket socket;
+    DatagramPacket datagramPacket;
+    datagramPacket = new DatagramPacket(data1.getBytes(),
+      data1.getBytes().length,
+      InetAddress.getLocalHost(), source.getSourcePort());
+    for (int i = 0; i < 10 ; i++) {
+      socket = new DatagramSocket();
+      socket.send(datagramPacket);
+      socket.close();
+    }
+
+    List<Event> channelEvents = new ArrayList<Event>();
+    Transaction txn = channel.getTransaction();
+    txn.begin();
+    for (int i = 0; i < 10; i++) {
+      Event e = channel.take();
+      Assert.assertNotNull(e);
+      channelEvents.add(e);
+    }
+
+    try {
+      txn.commit();
+    } catch (Throwable t) {
+      txn.rollback();
+    } finally {
+      txn.close();
+    }
+
+    source.stop();
+    for (Event e : channelEvents) {
+      Assert.assertNotNull(e);
+      String str = new String(e.getBody(), Charsets.UTF_8);
+      logger.info(str);
+      Assert.assertArrayEquals(data1.getBytes(),
+            e.getBody());
+
+    }
+  }
+
+  @Test
+  public void testLargePayload() throws Exception {
+    init();
+    source.start();
+    // Write some message to the netcat port
+
+    byte[] largePayload = getPayload(1000).getBytes();
+
+    DatagramSocket socket;
+    DatagramPacket datagramPacket;
+    datagramPacket = new DatagramPacket(largePayload,
+            1000,
+            InetAddress.getLocalHost(), source.getSourcePort());
+    for (int i = 0; i < 10 ; i++) {
+      socket = new DatagramSocket();
+      socket.send(datagramPacket);
+      socket.close();
+    }
+
+    List<Event> channelEvents = new ArrayList<Event>();
+    Transaction txn = channel.getTransaction();
+    txn.begin();
+    for (int i = 0; i < 10; i++) {
+      Event e = channel.take();
+      Assert.assertNotNull(e);
+      channelEvents.add(e);
+    }
+
+    try {
+      txn.commit();
+    } catch (Throwable t) {
+      txn.rollback();
+    } finally {
+      txn.close();
+    }
+
+    source.stop();
+    for (Event e : channelEvents) {
+      Assert.assertNotNull(e);
+      Assert.assertArrayEquals(largePayload, e.getBody());
+    }
+  }
+
+  @Test
+  public void testShortString() throws IOException {
+    runUdpTest(shortString);
+  }
+
+  @Test
+  public void testMediumString() throws IOException {
+    runUdpTest(mediumString);
+  }
+
+  private String getPayload(int length) {
+    StringBuilder payload = new StringBuilder(length);
+    for (int n = 0; n < length; ++n) {
+      payload.append("x");
+    }
+    return payload.toString();
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/flume/blob/ef54f9fd/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst 
b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 2073bf6..209db48 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -1475,8 +1475,8 @@ Also please make sure that the operating system user of 
the Flume processes has
     };
 
 
-NetCat Source
-~~~~~~~~~~~~~
+NetCat TCP Source
+~~~~~~~~~~~~~~~~~
 
 A netcat-like source that listens on a given port and turns each line of text
 into an event. Acts like ``nc -k -l [host] [port]``. In other words,
@@ -1512,6 +1512,40 @@ Example for agent named a1:
   a1.sources.r1.port = 6666
   a1.sources.r1.channels = c1
 
+NetCat UDP Source
+~~~~~~~~~~~~~~~~~
+
+As per the original Netcat (TCP) source, this source that listens on a given
+port and turns each line of text into an event and sent via the connected 
channel.
+Acts like ``nc -u -k -l [host] [port]``.
+
+Required properties are in **bold**.
+
+==================  ===========  ===========================================
+Property Name       Default      Description
+==================  ===========  ===========================================
+**channels**        --
+**type**            --           The component type name, needs to be 
``netcatudp``
+**bind**            --           Host name or IP address to bind to
+**port**            --           Port # to bind to
+remoteAddressHeader --
+selector.type       replicating  replicating or multiplexing
+selector.*                       Depends on the selector.type value
+interceptors        --           Space-separated list of interceptors
+interceptors.*
+==================  ===========  ===========================================
+
+Example for agent named a1:
+
+.. code-block:: properties
+
+  a1.sources = r1
+  a1.channels = c1
+  a1.sources.r1.type = netcatudp
+  a1.sources.r1.bind = 0.0.0.0
+  a1.sources.r1.port = 6666
+  a1.sources.r1.channels = c1
+
 Sequence Generator Source
 ~~~~~~~~~~~~~~~~~~~~~~~~~
 

Reply via email to