Repository: flume Updated Branches: refs/heads/trunk 5e9cfef2b -> ef54f9fd6
FLUME-2917. Provide netcat UDP source as alternative to TCP This patch adds a netcat UDP source. Reviewers: Lior Zeno, Chris Horrocks, Bessenyei Balázs Donát (Tristan Stevens via Bessenyei Balázs Donát) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/ef54f9fd Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/ef54f9fd Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/ef54f9fd Branch: refs/heads/trunk Commit: ef54f9fd6372143adc8c6a66f174feba381fec95 Parents: 5e9cfef Author: Tristan Stevens <[email protected]> Authored: Sun Jul 2 21:31:27 2017 +0000 Committer: Bessenyei Balázs Donát <[email protected]> Committed: Sun Jul 2 21:31:27 2017 +0000 ---------------------------------------------------------------------- .../flume/conf/source/SourceConfiguration.java | 9 +- .../apache/flume/conf/source/SourceType.java | 9 +- .../apache/flume/source/NetcatUdpSource.java | 188 +++++++++++++++++++ .../flume/source/TestDefaultSourceFactory.java | 1 + .../flume/source/TestNetcatUdpSource.java | 177 +++++++++++++++++ flume-ng-doc/sphinx/FlumeUserGuide.rst | 38 +++- 6 files changed, 418 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/ef54f9fd/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceConfiguration.java ---------------------------------------------------------------------- diff --git a/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceConfiguration.java b/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceConfiguration.java index 6bd1489..201d716 100644 --- a/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceConfiguration.java +++ b/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceConfiguration.java @@ -214,7 +214,14 @@ public class SourceConfiguration extends ComponentConfiguration { * * @see org.apache.flume.source.taildir.TaildirSource */ - TAILDIR("org.apache.flume.source.taildir.TaildirSourceConfiguration") + TAILDIR("org.apache.flume.source.taildir.TaildirSourceConfiguration"), + + /** + * Netcat UDP Source + * + * @see org.apache.flume.source.NetcatUdpSource + */ + NETCATUDP("org.apache.flume.conf.source.NetcatUdpSourceConfiguration") ; private String srcConfigurationName; http://git-wip-us.apache.org/repos/asf/flume/blob/ef54f9fd/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceType.java ---------------------------------------------------------------------- diff --git a/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceType.java b/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceType.java index 4f4073a..3e7e7be 100644 --- a/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceType.java +++ b/flume-ng-configuration/src/main/java/org/apache/flume/conf/source/SourceType.java @@ -110,7 +110,14 @@ public enum SourceType { * * @see org.apache.flume.source.taildir.TaildirSource */ - TAILDIR("org.apache.flume.source.taildir.TaildirSource") + TAILDIR("org.apache.flume.source.taildir.TaildirSource"), + + /** + * Netcat UDP Source + * + * @see org.apache.flume.source.NetcatUdpSource + */ + NETCATUDP("org.apache.flume.source.NetcatUdpSource") ; private final String sourceClassName; http://git-wip-us.apache.org/repos/asf/flume/blob/ef54f9fd/flume-ng-core/src/main/java/org/apache/flume/source/NetcatUdpSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/NetcatUdpSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/NetcatUdpSource.java new file mode 100644 index 0000000..38198a6 --- /dev/null +++ b/flume-ng-core/src/main/java/org/apache/flume/source/NetcatUdpSource.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.source; + +import java.io.ByteArrayOutputStream; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.flume.ChannelException; +import org.apache.flume.Context; +import org.apache.flume.CounterGroup; +import org.apache.flume.Event; +import org.apache.flume.EventDrivenSource; +import org.apache.flume.conf.Configurable; +import org.apache.flume.conf.Configurables; +import org.apache.flume.event.EventBuilder; +import org.jboss.netty.bootstrap.ConnectionlessBootstrap; +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.channel.AdaptiveReceiveBufferSizePredictorFactory; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.channel.ChannelPipelineFactory; +import org.jboss.netty.channel.Channels; +import org.jboss.netty.channel.MessageEvent; +import org.jboss.netty.channel.SimpleChannelHandler; +import org.jboss.netty.channel.socket.oio.OioDatagramChannelFactory; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class NetcatUdpSource extends AbstractSource + implements EventDrivenSource, Configurable { + + private int port; + private int maxsize = 1 << 16; // 64k + private String host = null; + private Channel nettyChannel; + private String remoteHostHeader = "REMOTE_ADDRESS"; + + private static final Logger logger = LoggerFactory + .getLogger(NetcatUdpSource.class); + + private CounterGroup counterGroup = new CounterGroup(); + + // Default Min size + private static final int DEFAULT_MIN_SIZE = 2048; + private static final int DEFAULT_INITIAL_SIZE = DEFAULT_MIN_SIZE; + private static final String REMOTE_ADDRESS_HEADER = "remoteAddress"; + private static final String CONFIG_PORT = "port"; + private static final String CONFIG_HOST = "bind"; + + public class NetcatHandler extends SimpleChannelHandler { + + + // extract line for building Flume event + private Event extractEvent(ChannelBuffer in, SocketAddress remoteAddress) { + + Map<String, String> headers = new HashMap<String,String>(); + + headers.put(remoteHostHeader, remoteAddress.toString()); + + byte b = 0; + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + Event e = null; + boolean doneReading = false; + + try { + while (!doneReading && in.readable()) { + b = in.readByte(); + // Entries are separated by '\n' + if (b == '\n') { + doneReading = true; + } else { + baos.write(b); + } + } + + e = EventBuilder.withBody(baos.toByteArray(), headers); + } finally { + // no-op + } + + return e; + } + + @Override + public void messageReceived(ChannelHandlerContext ctx, MessageEvent mEvent) { + try { + Event e = extractEvent((ChannelBuffer)mEvent.getMessage(), mEvent.getRemoteAddress()); + if (e == null) { + return; + } + getChannelProcessor().processEvent(e); + counterGroup.incrementAndGet("events.success"); + } catch (ChannelException ex) { + counterGroup.incrementAndGet("events.dropped"); + logger.error("Error writing to channel", ex); + } catch (RuntimeException ex) { + counterGroup.incrementAndGet("events.dropped"); + logger.error("Error retrieving event from udp stream, event dropped", ex); + } + } + } + + @Override + public void start() { + // setup Netty server + ConnectionlessBootstrap serverBootstrap = new ConnectionlessBootstrap( + new OioDatagramChannelFactory(Executors.newCachedThreadPool())); + final NetcatHandler handler = new NetcatHandler(); + serverBootstrap.setOption("receiveBufferSizePredictorFactory", + new AdaptiveReceiveBufferSizePredictorFactory(DEFAULT_MIN_SIZE, + DEFAULT_INITIAL_SIZE, maxsize)); + serverBootstrap.setPipelineFactory(new ChannelPipelineFactory() { + @Override + public ChannelPipeline getPipeline() { + return Channels.pipeline(handler); + } + }); + + if (host == null) { + nettyChannel = serverBootstrap.bind(new InetSocketAddress(port)); + } else { + nettyChannel = serverBootstrap.bind(new InetSocketAddress(host, port)); + } + + super.start(); + } + + @Override + public void stop() { + logger.info("Netcat UDP Source stopping..."); + logger.info("Metrics:{}", counterGroup); + if (nettyChannel != null) { + nettyChannel.close(); + try { + nettyChannel.getCloseFuture().await(60, TimeUnit.SECONDS); + } catch (InterruptedException e) { + logger.warn("netty server stop interrupted", e); + } finally { + nettyChannel = null; + } + } + + super.stop(); + } + + @Override + public void configure(Context context) { + Configurables.ensureRequiredNonNull( + context, CONFIG_PORT); + port = context.getInteger(CONFIG_PORT); + host = context.getString(CONFIG_HOST); + remoteHostHeader = context.getString(REMOTE_ADDRESS_HEADER); + } + + @VisibleForTesting + public int getSourcePort() { + SocketAddress localAddress = nettyChannel.getLocalAddress(); + if (localAddress instanceof InetSocketAddress) { + InetSocketAddress addr = (InetSocketAddress) localAddress; + return addr.getPort(); + } + return 0; + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/ef54f9fd/flume-ng-core/src/test/java/org/apache/flume/source/TestDefaultSourceFactory.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestDefaultSourceFactory.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestDefaultSourceFactory.java index baa8500..97e024f 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/source/TestDefaultSourceFactory.java +++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestDefaultSourceFactory.java @@ -66,6 +66,7 @@ public class TestDefaultSourceFactory { public void testSourceCreation() throws Exception { verifySourceCreation("seq-src", "seq", SequenceGeneratorSource.class); verifySourceCreation("netcat-src", "netcat", NetcatSource.class); + verifySourceCreation("netcat-udp-src", "netcatudp", NetcatUdpSource.class); verifySourceCreation("exec-src", "exec", ExecSource.class); verifySourceCreation("avro-src", "avro", AvroSource.class); verifySourceCreation("syslogtcp-src", "syslogtcp", SyslogTcpSource.class); http://git-wip-us.apache.org/repos/asf/flume/blob/ef54f9fd/flume-ng-core/src/test/java/org/apache/flume/source/TestNetcatUdpSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestNetcatUdpSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestNetcatUdpSource.java new file mode 100644 index 0000000..03740e9 --- /dev/null +++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestNetcatUdpSource.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.source; + +import java.util.ArrayList; +import java.util.List; +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.InetAddress; +import java.net.DatagramSocket; +import com.google.common.base.Charsets; +import org.apache.flume.Channel; +import org.apache.flume.ChannelSelector; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.Transaction; +import org.apache.flume.channel.ChannelProcessor; +import org.apache.flume.channel.MemoryChannel; +import org.apache.flume.channel.ReplicatingChannelSelector; +import org.apache.flume.conf.Configurables; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.LoggerFactory; + +public class TestNetcatUdpSource { + private static final org.slf4j.Logger logger = LoggerFactory.getLogger(TestNetcatUdpSource.class); + private NetcatUdpSource source; + private Channel channel; + private static final int TEST_NETCAT_PORT = 0; + private final String shortString = "Lorem ipsum dolor sit amet."; + private final String mediumString = "Lorem ipsum dolor sit amet, consectetur adipiscing elit. " + + "Nunc maximus rhoncus viverra. Nunc a metus."; + + private void init() { + source = new NetcatUdpSource(); + channel = new MemoryChannel(); + + Configurables.configure(channel, new Context()); + + List<Channel> channels = new ArrayList<Channel>(); + channels.add(channel); + + ChannelSelector rcs = new ReplicatingChannelSelector(); + rcs.setChannels(channels); + + source.setChannelProcessor(new ChannelProcessor(rcs)); + Context context = new Context(); + context.put("port", String.valueOf(TEST_NETCAT_PORT)); + + source.configure(context); + + } + + /** Tests the keepFields configuration parameter (enabled or disabled) + using SyslogUDPSource.*/ + + private void runUdpTest(String data1) throws IOException { + init(); + source.start(); + // Write some message to the port + DatagramSocket socket; + DatagramPacket datagramPacket; + datagramPacket = new DatagramPacket(data1.getBytes(), + data1.getBytes().length, + InetAddress.getLocalHost(), source.getSourcePort()); + for (int i = 0; i < 10 ; i++) { + socket = new DatagramSocket(); + socket.send(datagramPacket); + socket.close(); + } + + List<Event> channelEvents = new ArrayList<Event>(); + Transaction txn = channel.getTransaction(); + txn.begin(); + for (int i = 0; i < 10; i++) { + Event e = channel.take(); + Assert.assertNotNull(e); + channelEvents.add(e); + } + + try { + txn.commit(); + } catch (Throwable t) { + txn.rollback(); + } finally { + txn.close(); + } + + source.stop(); + for (Event e : channelEvents) { + Assert.assertNotNull(e); + String str = new String(e.getBody(), Charsets.UTF_8); + logger.info(str); + Assert.assertArrayEquals(data1.getBytes(), + e.getBody()); + + } + } + + @Test + public void testLargePayload() throws Exception { + init(); + source.start(); + // Write some message to the netcat port + + byte[] largePayload = getPayload(1000).getBytes(); + + DatagramSocket socket; + DatagramPacket datagramPacket; + datagramPacket = new DatagramPacket(largePayload, + 1000, + InetAddress.getLocalHost(), source.getSourcePort()); + for (int i = 0; i < 10 ; i++) { + socket = new DatagramSocket(); + socket.send(datagramPacket); + socket.close(); + } + + List<Event> channelEvents = new ArrayList<Event>(); + Transaction txn = channel.getTransaction(); + txn.begin(); + for (int i = 0; i < 10; i++) { + Event e = channel.take(); + Assert.assertNotNull(e); + channelEvents.add(e); + } + + try { + txn.commit(); + } catch (Throwable t) { + txn.rollback(); + } finally { + txn.close(); + } + + source.stop(); + for (Event e : channelEvents) { + Assert.assertNotNull(e); + Assert.assertArrayEquals(largePayload, e.getBody()); + } + } + + @Test + public void testShortString() throws IOException { + runUdpTest(shortString); + } + + @Test + public void testMediumString() throws IOException { + runUdpTest(mediumString); + } + + private String getPayload(int length) { + StringBuilder payload = new StringBuilder(length); + for (int n = 0; n < length; ++n) { + payload.append("x"); + } + return payload.toString(); + } +} + http://git-wip-us.apache.org/repos/asf/flume/blob/ef54f9fd/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index 2073bf6..209db48 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -1475,8 +1475,8 @@ Also please make sure that the operating system user of the Flume processes has }; -NetCat Source -~~~~~~~~~~~~~ +NetCat TCP Source +~~~~~~~~~~~~~~~~~ A netcat-like source that listens on a given port and turns each line of text into an event. Acts like ``nc -k -l [host] [port]``. In other words, @@ -1512,6 +1512,40 @@ Example for agent named a1: a1.sources.r1.port = 6666 a1.sources.r1.channels = c1 +NetCat UDP Source +~~~~~~~~~~~~~~~~~ + +As per the original Netcat (TCP) source, this source that listens on a given +port and turns each line of text into an event and sent via the connected channel. +Acts like ``nc -u -k -l [host] [port]``. + +Required properties are in **bold**. + +================== =========== =========================================== +Property Name Default Description +================== =========== =========================================== +**channels** -- +**type** -- The component type name, needs to be ``netcatudp`` +**bind** -- Host name or IP address to bind to +**port** -- Port # to bind to +remoteAddressHeader -- +selector.type replicating replicating or multiplexing +selector.* Depends on the selector.type value +interceptors -- Space-separated list of interceptors +interceptors.* +================== =========== =========================================== + +Example for agent named a1: + +.. code-block:: properties + + a1.sources = r1 + a1.channels = c1 + a1.sources.r1.type = netcatudp + a1.sources.r1.bind = 0.0.0.0 + a1.sources.r1.port = 6666 + a1.sources.r1.channels = c1 + Sequence Generator Source ~~~~~~~~~~~~~~~~~~~~~~~~~
