Updated Branches: refs/heads/trunk 67454a71a -> 9790ca758
FLUME-2217. Add option to preserve all Syslog headers in syslog sources (Jeff Lord via Mike Percy) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/9790ca75 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/9790ca75 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/9790ca75 Branch: refs/heads/trunk Commit: 9790ca7587060285efa4ae64591cea17dd3f00cf Parents: 67454a7 Author: Mike Percy <[email protected]> Authored: Tue Dec 10 14:38:06 2013 -0800 Committer: Mike Percy <[email protected]> Committed: Tue Dec 10 14:56:07 2013 -0800 ---------------------------------------------------------------------- .../flume/source/MultiportSyslogTCPSource.java | 15 ++- .../org/apache/flume/source/SyslogParser.java | 11 +- .../apache/flume/source/SyslogTcpSource.java | 4 +- .../apache/flume/source/SyslogUDPSource.java | 19 ++++ .../org/apache/flume/source/SyslogUtils.java | 40 +++---- .../source/TestMultiportSyslogTCPSource.java | 9 +- .../apache/flume/source/TestSyslogParser.java | 16 ++- .../flume/source/TestSyslogTcpSource.java | 14 +-- .../flume/source/TestSyslogUdpSource.java | 113 ++++++++++++------- .../apache/flume/source/TestSyslogUtils.java | 25 ++-- flume-ng-doc/sphinx/FlumeUserGuide.rst | 6 +- 11 files changed, 176 insertions(+), 96 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/9790ca75/flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java index 884fd62..427e0e3 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java @@ -67,6 +67,7 @@ public class MultiportSyslogTCPSource extends AbstractSource implements private SourceCounter sourceCounter = null; private Charset defaultCharset; private ThreadSafeDecoder defaultDecoder; + private boolean keepFields; public MultiportSyslogTCPSource() { portCharsets = new ConcurrentHashMap<Integer, ThreadSafeDecoder>(); @@ -138,6 +139,10 @@ public class MultiportSyslogTCPSource extends AbstractSource implements SyslogSourceConfigurationConstants.CONFIG_READBUF_SIZE, SyslogSourceConfigurationConstants.DEFAULT_READBUF_SIZE); + keepFields = context.getBoolean( + SyslogSourceConfigurationConstants.CONFIG_KEEP_FIELDS, + SyslogSourceConfigurationConstants.DEFAULT_KEEP_FIELDS); + if (sourceCounter == null) { sourceCounter = new SourceCounter(getName()); } @@ -159,7 +164,7 @@ public class MultiportSyslogTCPSource extends AbstractSource implements acceptor.setHandler(new MultiportSyslogHandler(maxEventSize, batchSize, getChannelProcessor(), sourceCounter, portHeader, defaultDecoder, - portCharsets)); + portCharsets, keepFields)); for (int port : ports) { InetSocketAddress addr; @@ -213,11 +218,12 @@ public class MultiportSyslogTCPSource extends AbstractSource implements private final LineSplitter lineSplitter; private final ThreadSafeDecoder defaultDecoder; private final ConcurrentMap<Integer, ThreadSafeDecoder> portCharsets; + private final boolean keepFields; public MultiportSyslogHandler(int maxEventSize, int batchSize, ChannelProcessor cp, SourceCounter ctr, String portHeader, ThreadSafeDecoder defaultDecoder, - ConcurrentMap<Integer, ThreadSafeDecoder> portCharsets) { + ConcurrentMap<Integer, ThreadSafeDecoder> portCharsets, boolean keepFields) { channelProcessor = cp; sourceCounter = ctr; this.maxEventSize = maxEventSize; @@ -225,6 +231,7 @@ public class MultiportSyslogTCPSource extends AbstractSource implements this.portHeader = portHeader; this.defaultDecoder = defaultDecoder; this.portCharsets = portCharsets; + this.keepFields = keepFields; syslogParser = new SyslogParser(); lineSplitter = new LineSplitter(maxEventSize); } @@ -321,7 +328,7 @@ public class MultiportSyslogTCPSource extends AbstractSource implements /** * Decodes a syslog-formatted ParsedLine into a Flume Event. * @param parsedBuf Buffer containing characters to be parsed - * @param port Incoming port + * @param decoder Character set is configurable on a per-port basis. * @return */ Event parseEvent(ParsedBuffer parsedBuf, CharsetDecoder decoder) { @@ -351,7 +358,7 @@ public class MultiportSyslogTCPSource extends AbstractSource implements Event event; try { - event = syslogParser.parseMessage(msg, decoder.charset()); + event = syslogParser.parseMessage(msg, decoder.charset(), keepFields); if (parsedBuf.incomplete) { event.getHeaders().put(SyslogUtils.EVENT_STATUS, SyslogUtils.SyslogStatus.INCOMPLETE.getSyslogStatus()); http://git-wip-us.apache.org/repos/asf/flume/blob/9790ca75/flume-ng-core/src/main/java/org/apache/flume/source/SyslogParser.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogParser.java b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogParser.java index bf3305c..557d121 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogParser.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogParser.java @@ -33,6 +33,8 @@ import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.regex.Pattern; import org.apache.flume.Event; +import org.apache.flume.annotations.InterfaceAudience; +import org.apache.flume.annotations.InterfaceStability; import org.apache.flume.event.EventBuilder; import org.joda.time.DateTime; import org.joda.time.format.DateTimeFormat; @@ -40,6 +42,8 @@ import org.joda.time.format.DateTimeFormatter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; [email protected] [email protected] public class SyslogParser { private static final Logger logger = @@ -53,7 +57,6 @@ public class SyslogParser { private static final String timePat = "yyyy-MM-dd'T'HH:mm:ss"; private static final int RFC3164_LEN = 15; private static final int RFC5424_PREFIX_LEN = 19; - private final DateTimeFormatter timeParser; private Cache<String, Long> timestampCache; @@ -76,7 +79,7 @@ public class SyslogParser { * @return Parsed Flume Event * @throws IllegalArgumentException if unable to successfully parse message */ - public Event parseMessage(String msg, Charset charset) { + public Event parseMessage(String msg, Charset charset, boolean keepFields) { Map<String, String> headers = Maps.newHashMap(); int msgLen = msg.length(); @@ -164,9 +167,11 @@ public class SyslogParser { // EventBuilder will do a copy of its own, so no defensive copy of the body String data = ""; - if (msgLen > nextSpace + 1) { + if (msgLen > nextSpace + 1 && !keepFields) { curPos = nextSpace + 1; data = msg.substring(curPos); + } else { + data = msg; } Event event = EventBuilder.withBody(data, charset, headers); http://git-wip-us.apache.org/repos/asf/flume/blob/9790ca75/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java index 7a12d27..e84e4b6 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java @@ -68,8 +68,8 @@ implements EventDrivenSource, Configurable { syslogUtils.setEventSize(eventSize); } - public void setKeepFields(boolean removeFields){ - syslogUtils.setKeepFields(removeFields); + public void setKeepFields(boolean keepFields){ + syslogUtils.setKeepFields(keepFields); } public void setFormater(Map<String, String> prop) { http://git-wip-us.apache.org/repos/asf/flume/blob/9790ca75/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java index 96a9e85..8fb251b 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java @@ -19,10 +19,12 @@ package org.apache.flume.source; import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import com.google.common.annotations.VisibleForTesting; import org.apache.flume.ChannelException; import org.apache.flume.Context; import org.apache.flume.CounterGroup; @@ -53,6 +55,7 @@ public class SyslogUDPSource extends AbstractSource private String host = null; private Channel nettyChannel; private Map<String, String> formaterProp; + private boolean keepFields; private static final Logger logger = LoggerFactory .getLogger(SyslogUDPSource.class); @@ -65,6 +68,10 @@ public class SyslogUDPSource extends AbstractSource syslogUtils.addFormats(prop); } + public void setKeepFields(boolean keepFields) { + syslogUtils.setKeepFields(keepFields); + } + @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent mEvent) { try { @@ -90,6 +97,7 @@ public class SyslogUDPSource extends AbstractSource (new OioDatagramChannelFactory(Executors.newCachedThreadPool())); final syslogHandler handler = new syslogHandler(); handler.setFormater(formaterProp); + handler.setKeepFields(keepFields); serverBootstrap.setPipelineFactory(new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() { @@ -132,6 +140,17 @@ public class SyslogUDPSource extends AbstractSource host = context.getString(SyslogSourceConfigurationConstants.CONFIG_HOST); formaterProp = context.getSubProperties( SyslogSourceConfigurationConstants.CONFIG_FORMAT_PREFIX); + keepFields = context.getBoolean(SyslogSourceConfigurationConstants.CONFIG_KEEP_FIELDS, + SyslogSourceConfigurationConstants.DEFAULT_KEEP_FIELDS); } + @VisibleForTesting + public int getSourcePort() { + SocketAddress localAddress = nettyChannel.getLocalAddress(); + if (localAddress instanceof InetSocketAddress) { + InetSocketAddress addr = (InetSocketAddress) localAddress; + return addr.getPort(); + } + return 0; + } } http://git-wip-us.apache.org/repos/asf/flume/blob/9790ca75/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java index f2ea932..a77bfc9 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java @@ -28,7 +28,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.ByteArrayOutputStream; -import java.io.IOException; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; @@ -49,15 +48,18 @@ public class SyslogUtils { final public static String SYSLOG_TIMESTAMP_FORMAT_RFC3164_1 = "yyyyMMM d HH:mm:ss"; final public static String SYSLOG_MSG_RFC5424_0 = - "(?:\\d\\s)?" +// version - // yyyy-MM-dd'T'HH:mm:ss.SZ or yyyy-MM-dd'T'HH:mm:ss.S+hh:mm or - (null stamp) - "(?:(\\d{4}[-]\\d{2}[-]\\d{2}[T]\\d{2}[:]\\d{2}[:]\\d{2}(?:\\.\\d{1,6})?(?:[+-]\\d{2}[:]\\d{2}|Z)?)|-)" + // stamp + "(?:\\<\\d{1,3}\\>\\d?\\s?)" + // priority + /* yyyy-MM-dd'T'HH:mm:ss.SZ or yyyy-MM-dd'T'HH:mm:ss.S+hh:mm or - (null stamp) */ + "(?:" + + "(\\d{4}[-]\\d{2}[-]\\d{2}[T]\\d{2}[:]\\d{2}[:]\\d{2}" + + "(?:\\.\\d{1,6})?(?:[+-]\\d{2}[:]\\d{2}|Z)?)|-)" + // stamp "\\s" + // separator "(?:([\\w][\\w\\d\\.@-]*)|-)" + // host name or - (null) "\\s" + // separator "(.*)$"; // body final public static String SYSLOG_MSG_RFC3164_0 = + "(?:\\<\\d{1,3}\\>\\d?\\s?)" + // stamp MMM d HH:mm:ss, single digit date has two spaces "([A-Z][a-z][a-z]\\s{1,2}\\d{1,2}\\s\\d{2}[:]\\d{2}[:]\\d{2})" + "\\s" + // separator @@ -225,8 +227,13 @@ public class SyslogUtils { headers.put(EVENT_STATUS, SyslogStatus.INCOMPLETE.getSyslogStatus()); } - if ((msgBody != null) && (msgBody.length() > 0) && !keepFields) { - body = msgBody.getBytes(); + if (!keepFields) { + if ((msgBody != null) && (msgBody.length() > 0)) { + body = msgBody.getBytes(); + } else { + // Parse failed. + body = baos.toByteArray(); + } } else { body = baos.toByteArray(); } @@ -311,14 +318,15 @@ public class SyslogUtils { switch (m) { case START: if (b == '<') { + baos.write(b); m = Mode.PRIO; } else if(b == '\n'){ - //If the character is \n, it was because the last event was exactly - //as long as the maximum size allowed and - //the only remaining character was the delimiter - '\n', or - //multiple delimiters were sent in a row. - //Just ignore it, and move forward, don't change the mode. - //This is a no-op, just ignore it. + //If the character is \n, it was because the last event was exactly + //as long as the maximum size allowed and + //the only remaining character was the delimiter - '\n', or + //multiple delimiters were sent in a row. + //Just ignore it, and move forward, don't change the mode. + //This is a no-op, just ignore it. logger.debug("Delimiter found while in START mode, ignoring.."); } else { @@ -329,6 +337,7 @@ public class SyslogUtils { } break; case PRIO: + baos.write(b); if (b == '>') { m = Mode.DATA; } else { @@ -336,9 +345,6 @@ public class SyslogUtils { prio.append(ch); if (!Character.isDigit(ch)) { isBadEvent = true; - //Append the priority to baos: - String badPrio = "<"+ prio; - baos.write(badPrio.getBytes()); //If we hit a bad priority, just write as if everything is data. m = Mode.DATA; } @@ -367,10 +373,6 @@ public class SyslogUtils { doneReading = true; e = buildEvent(); } - //} catch (IndexOutOfBoundsException eF) { - // e = buildEvent(prio, baos); - } catch (IOException e1) { - //no op } finally { // no-op } http://git-wip-us.apache.org/repos/asf/flume/blob/9790ca75/flume-ng-core/src/test/java/org/apache/flume/source/TestMultiportSyslogTCPSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestMultiportSyslogTCPSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestMultiportSyslogTCPSource.java index 680e592..9b97c8c 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/source/TestMultiportSyslogTCPSource.java +++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestMultiportSyslogTCPSource.java @@ -70,6 +70,7 @@ public class TestMultiportSyslogTCPSource { private final String stamp1 = time.toString(); private final String host1 = "localhost.localdomain"; private final String data1 = "proc1 - some msg"; + private final static boolean KEEP_FIELDS = false; /** * Helper function to generate a syslog message. @@ -205,7 +206,8 @@ public class TestMultiportSyslogTCPSource { new MultiportSyslogTCPSource.MultiportSyslogHandler(maxLen, 100, null, null, SyslogSourceConfigurationConstants.DEFAULT_PORT_HEADER, new ThreadSafeDecoder(Charsets.UTF_8), - new ConcurrentHashMap<Integer, ThreadSafeDecoder>()); + new ConcurrentHashMap<Integer, ThreadSafeDecoder>(), + KEEP_FIELDS); Event event = handler.parseEvent(parsedLine, Charsets.UTF_8.newDecoder()); String body = new String(event.getBody(), Charsets.UTF_8); @@ -231,7 +233,8 @@ public class TestMultiportSyslogTCPSource { 1000, 10, new ChannelProcessor(new ReplicatingChannelSelector()), new SourceCounter("test"), "port", new ThreadSafeDecoder(Charsets.UTF_8), - new ConcurrentHashMap<Integer, ThreadSafeDecoder>()); + new ConcurrentHashMap<Integer, ThreadSafeDecoder>(), + KEEP_FIELDS); ParsedBuffer parsedBuf = new ParsedBuffer(); parsedBuf.incomplete = false; @@ -331,7 +334,7 @@ public class TestMultiportSyslogTCPSource { // defaults to UTF-8 MultiportSyslogHandler handler = new MultiportSyslogHandler( 1000, 10, chanProc, new SourceCounter("test"), "port", - new ThreadSafeDecoder(Charsets.UTF_8), portCharsets); + new ThreadSafeDecoder(Charsets.UTF_8), portCharsets, KEEP_FIELDS); // initialize buffers handler.sessionCreated(session1); http://git-wip-us.apache.org/repos/asf/flume/blob/9790ca75/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogParser.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogParser.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogParser.java index 258c2f1..2809163 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogParser.java +++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogParser.java @@ -29,7 +29,6 @@ import org.junit.Assert; import org.junit.Test; public class TestSyslogParser { - @Test public void testRfc5424DateParsing() { final String[] examples = { @@ -55,7 +54,7 @@ public class TestSyslogParser { Charset charset = Charsets.UTF_8; List<String> messages = Lists.newArrayList(); - // supported examples from RFC 3161 + // supported examples from RFC 3164 messages.add("<34>Oct 11 22:14:15 mymachine su: 'su root' failed for " + "lonvick on /dev/pts/8"); messages.add("<13>Feb 5 17:32:18 10.0.0.99 Use the BFG!"); @@ -76,8 +75,19 @@ public class TestSyslogParser { messages.add("<13>2003-08-24T05:14:15Z localhost snarf?"); messages.add("<13>2012-08-16T14:34:03-08:00 127.0.0.1 test shnap!"); + // test with default keepFields = false + for (String msg : messages) { + boolean keepFields = false; + Event event = parser.parseMessage(msg, charset, keepFields); + Assert.assertNull("Failure to parse known-good syslog message", + event.getHeaders().get(SyslogUtils.EVENT_STATUS)); + } + + // test that priority, timestamp and hostname are preserved in event body for (String msg : messages) { - Event event = parser.parseMessage(msg, charset); + boolean keepFields = true; + Event event = parser.parseMessage(msg, charset, keepFields); + Assert.assertArrayEquals(event.getBody(), msg.getBytes()); Assert.assertNull("Failure to parse known-good syslog message", event.getHeaders().get(SyslogUtils.EVENT_STATUS)); } http://git-wip-us.apache.org/repos/asf/flume/blob/9790ca75/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogTcpSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogTcpSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogTcpSource.java index a6a1d5b..22fa200 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogTcpSource.java +++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogTcpSource.java @@ -49,13 +49,8 @@ public class TestSyslogTcpSource { private final String stamp1 = time.toString(); private final String host1 = "localhost.localdomain"; private final String data1 = "test syslog data"; - private final String bodyWithTandH = stamp1 + " " + host1 + " " + data1; - // Helper function to generate a syslog message. - private byte[] getEvent() { - // timestamp with 'Z' appended, translates to UTC - final String msg1 = "<10>" + stamp1 + " " + host1 + " " + data1 + "\n"; - return msg1.getBytes(); - } + private final String bodyWithTandH = "<10>" + stamp1 + " " + host1 + " " + + data1 + "\n"; private void init(boolean keepFields){ source = new SyslogTcpSource(); @@ -87,7 +82,7 @@ public class TestSyslogTcpSource { for (int i = 0; i < 10 ; i++) { syslogSocket = new Socket( InetAddress.getLocalHost(), source.getSourcePort()); - syslogSocket.getOutputStream().write(getEvent()); + syslogSocket.getOutputStream().write(bodyWithTandH.getBytes()); syslogSocket.close(); } @@ -116,7 +111,8 @@ public class TestSyslogTcpSource { String str = new String(e.getBody(), Charsets.UTF_8); logger.info(str); if (keepFields) { - Assert.assertArrayEquals(bodyWithTandH.getBytes(), e.getBody()); + Assert.assertArrayEquals(bodyWithTandH.trim().getBytes(), + e.getBody()); } else if (!keepFields) { Assert.assertArrayEquals(data1.getBytes(), e.getBody()); } http://git-wip-us.apache.org/repos/asf/flume/blob/9790ca75/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUdpSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUdpSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUdpSource.java index eae26ed..36f6479 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUdpSource.java +++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUdpSource.java @@ -20,15 +20,11 @@ package org.apache.flume.source; import java.util.ArrayList; import java.util.List; - +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.InetAddress; +import java.net.DatagramSocket; import com.google.common.base.Charsets; -import org.apache.log4j.Logger; -import org.apache.log4j.net.SyslogAppender; - -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - import org.apache.flume.Channel; import org.apache.flume.ChannelSelector; import org.apache.flume.Context; @@ -38,16 +34,27 @@ import org.apache.flume.channel.ChannelProcessor; import org.apache.flume.channel.MemoryChannel; import org.apache.flume.channel.ReplicatingChannelSelector; import org.apache.flume.conf.Configurables; -import org.apache.flume.source.SyslogUtils; +import org.joda.time.DateTime; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.LoggerFactory; + public class TestSyslogUdpSource { + private static final org.slf4j.Logger logger = + LoggerFactory.getLogger(TestSyslogUdpSource.class); private SyslogUDPSource source; private Channel channel; - private static final int TEST_SYSLOG_PORT = 14455; - - @Before - public void setUp() { - source = new SyslogUDPSource(); //SyslogTcpSource(); + private static final int TEST_SYSLOG_PORT = 0; + private final DateTime time = new DateTime(); + private final String stamp1 = time.toString(); + private final String host1 = "localhost.localdomain"; + private final String data1 = "test UDP syslog data"; + private final String bodyWithTandH = "<10>" + stamp1 + " " + host1 + " " + + data1; + + private void init(boolean keepFields) { + source = new SyslogUDPSource(); channel = new MemoryChannel(); Configurables.configure(channel, new Context()); @@ -61,49 +68,69 @@ public class TestSyslogUdpSource { source.setChannelProcessor(new ChannelProcessor(rcs)); Context context = new Context(); context.put("port", String.valueOf(TEST_SYSLOG_PORT)); + context.put("keepFields", String.valueOf(keepFields)); + source.configure(context); + } - @Test - public void testAppend() throws InterruptedException { - Logger logger = Logger.getLogger(getClass()); - // use the Apache syslog appender to write to syslog source - SyslogAppender appender = new SyslogAppender(null, - "localhost:"+TEST_SYSLOG_PORT, SyslogAppender.LOG_FTP); - logger.addAppender(appender); - Event e = null; - Event e2 = null; + /** Tests the keepFields configuration parameter (enabled or disabled) + using SyslogUDPSource.*/ + private void runKeepFieldsTest(boolean keepFields) throws IOException { + init(keepFields); source.start(); + // Write some message to the syslog port + DatagramSocket syslogSocket; + DatagramPacket datagramPacket; + datagramPacket = new DatagramPacket(bodyWithTandH.getBytes(), + bodyWithTandH.getBytes().length, + InetAddress.getLocalHost(), source.getSourcePort()); + for (int i = 0; i < 10 ; i++) { + syslogSocket = new DatagramSocket(); + syslogSocket.send(datagramPacket); + syslogSocket.close(); + } - // write to syslog - logger.info("test flume syslog"); - logger.info(""); - + List<Event> channelEvents = new ArrayList<Event>(); Transaction txn = channel.getTransaction(); + txn.begin(); + for (int i = 0; i < 10; i++) { + Event e = channel.take(); + Assert.assertNotNull(e); + channelEvents.add(e); + } + try { - txn.begin(); - e = channel.take(); - e2 = channel.take(); txn.commit(); + } catch (Throwable t) { + txn.rollback(); } finally { txn.close(); } source.stop(); - logger.removeAppender(appender); - - String str = new String(e.getBody(), Charsets.UTF_8); - logger.info(str); - Assert.assertNotNull(e); - Assert.assertEquals(String.valueOf(SyslogAppender.LOG_FTP / 8), - e.getHeaders().get(SyslogUtils.SYSLOG_FACILITY)); - Assert.assertArrayEquals(e.getBody(), "test flume syslog".getBytes()); - - Assert.assertNotNull(e2); - Assert.assertEquals(String.valueOf(SyslogAppender.LOG_FTP / 8), - e2.getHeaders().get(SyslogUtils.SYSLOG_FACILITY)); - Assert.assertArrayEquals(e2.getBody(), "".getBytes()); + for (Event e : channelEvents) { + Assert.assertNotNull(e); + String str = new String(e.getBody(), Charsets.UTF_8); + logger.info(str); + if (keepFields) { + Assert.assertArrayEquals(bodyWithTandH.getBytes(), + e.getBody()); + } else if (!keepFields) { + Assert.assertArrayEquals(data1.getBytes(), e.getBody()); + } + } } + @Test + public void testKeepFields() throws IOException { + runKeepFieldsTest(true); + } + + @Test + public void testRemoveFields() throws IOException { + runKeepFieldsTest(false); + } } + http://git-wip-us.apache.org/repos/asf/flume/blob/9790ca75/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java index 898096b..82b7dd0 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java +++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java @@ -251,7 +251,8 @@ public class TestSyslogUtils { Assert.assertEquals("1", headers.get(SyslogUtils.SYSLOG_FACILITY)); Assert.assertEquals("2", headers.get(SyslogUtils.SYSLOG_SEVERITY)); Assert.assertEquals(null, headers.get(SyslogUtils.EVENT_STATUS)); - Assert.assertEquals(goodData1.trim(), new String(e.getBody()).trim()); + Assert.assertEquals(priority + goodData1.trim(), + new String(e.getBody()).trim()); } @@ -277,7 +278,8 @@ public class TestSyslogUtils { Assert.assertEquals("0", headers.get(SyslogUtils.SYSLOG_SEVERITY)); Assert.assertEquals(SyslogUtils.SyslogStatus.INVALID.getSyslogStatus(), headers.get(SyslogUtils.EVENT_STATUS)); - Assert.assertEquals(badData1.trim(), new String(e.getBody()).trim()); + Assert.assertEquals(badData1.trim(), new String(e.getBody()) + .trim()); Event e2 = util.extractEvent(buff); if(e2 == null){ @@ -288,7 +290,8 @@ public class TestSyslogUtils { Assert.assertEquals("2", headers2.get(SyslogUtils.SYSLOG_SEVERITY)); Assert.assertEquals(null, headers2.get(SyslogUtils.EVENT_STATUS)); - Assert.assertEquals(goodData1.trim(), new String(e2.getBody()).trim()); + Assert.assertEquals(priority + goodData1.trim(), + new String(e2.getBody()).trim()); } @Test @@ -310,7 +313,8 @@ public class TestSyslogUtils { Assert.assertEquals("2", headers2.get(SyslogUtils.SYSLOG_SEVERITY)); Assert.assertEquals(null, headers2.get(SyslogUtils.EVENT_STATUS)); - Assert.assertEquals(goodData1.trim(), new String(e2.getBody()).trim()); + Assert.assertEquals(priority + goodData1.trim(), + new String(e2.getBody()).trim()); Event e = util.extractEvent(buff); @@ -379,7 +383,8 @@ public class TestSyslogUtils { Assert.assertEquals("2", headers.get(SyslogUtils.SYSLOG_SEVERITY)); Assert.assertEquals(null, headers.get(SyslogUtils.EVENT_STATUS)); - Assert.assertEquals(goodData1.trim(), new String(e.getBody()).trim()); + Assert.assertEquals(priority + goodData1.trim(), + new String(e.getBody()).trim()); Event e2 = util.extractEvent(buff); @@ -391,14 +396,16 @@ public class TestSyslogUtils { Assert.assertEquals("4", headers2.get(SyslogUtils.SYSLOG_SEVERITY)); Assert.assertEquals(null, headers.get(SyslogUtils.EVENT_STATUS)); - Assert.assertEquals(goodData2.trim(), new String(e2.getBody()).trim()); + Assert.assertEquals(priority2 + goodData2.trim(), + new String(e2.getBody()).trim()); } @Test public void testExtractBadEventLarge() { String badData1 = "<10> bad bad data bad bad\n"; - SyslogUtils util = new SyslogUtils(5, true, false); + // The minimum size (which is 10) overrides the 5 specified here. + SyslogUtils util = new SyslogUtils(5, false, false); ChannelBuffer buff = ChannelBuffers.buffer(100); buff.writeBytes(badData1.getBytes()); Event e = util.extractEvent(buff); @@ -410,7 +417,7 @@ public class TestSyslogUtils { Assert.assertEquals("2", headers.get(SyslogUtils.SYSLOG_SEVERITY)); Assert.assertEquals(SyslogUtils.SyslogStatus.INCOMPLETE.getSyslogStatus(), headers.get(SyslogUtils.EVENT_STATUS)); - Assert.assertEquals("bad bad d".trim(), new String(e.getBody()).trim()); + Assert.assertEquals("<10> bad b".trim(), new String(e.getBody()).trim()); Event e2 = util.extractEvent(buff); @@ -422,7 +429,7 @@ public class TestSyslogUtils { Assert.assertEquals("0", headers2.get(SyslogUtils.SYSLOG_SEVERITY)); Assert.assertEquals(SyslogUtils.SyslogStatus.INVALID.getSyslogStatus(), headers2.get(SyslogUtils.EVENT_STATUS)); - Assert.assertEquals("ata bad ba".trim(), new String(e2.getBody()).trim()); + Assert.assertEquals("ad data ba".trim(), new String(e2.getBody()).trim()); } http://git-wip-us.apache.org/repos/asf/flume/blob/9790ca75/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index 0737c44..ae66f89 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -1171,7 +1171,7 @@ Property Name Default Description **host** -- Host name or IP address to bind to **port** -- Port # to bind to eventSize 2500 Maximum size of a single event line, in bytes -keepFields false Setting this to true will preserve the +keepFields false Setting this to true will preserve the Priority, Timestamp and Hostname in the body of the event. selector.type replicating or multiplexing selector.* replicating Depends on the selector.type value @@ -1209,6 +1209,8 @@ Property Name Default Description **host** -- Host name or IP address to bind to. **ports** -- Space-separated list (one or more) of ports to bind to. eventSize 2500 Maximum size of a single event line, in bytes. +keepFields false Setting this to true will preserve the + Priority, Timestamp and Hostname in the body of the event. portHeader -- If specified, the port number will be stored in the header of each event using the header name specified here. This allows for interceptors and channel selectors to customize routing logic based on the incoming port. charset.default UTF-8 Default character set used while parsing syslog events into strings. charset.port.<port> -- Character set is configurable on a per-port basis. @@ -1243,6 +1245,8 @@ Property Name Default Description **type** -- The component type name, needs to be ``syslogudp`` **host** -- Host name or IP address to bind to **port** -- Port # to bind to +keepFields false Setting this to true will preserve the Priority, + Timestamp and Hostname in the body of the event. selector.type replicating or multiplexing selector.* replicating Depends on the selector.type value interceptors -- Space-separated list of interceptors
