Repository: nifi Updated Branches: refs/heads/master 201eac052 -> 618f22e11
http://git-wip-us.apache.org/repos/asf/nifi/blob/9c542432/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSyslog.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSyslog.java new file mode 100644 index 0000000..40a9123 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSyslog.java @@ -0,0 +1,398 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import org.apache.nifi.processor.Processor; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +public class TestPutSyslog { + + private MockCollectingSender sender; + private MockPutSyslog proc; + private TestRunner runner; + + @Before + public void setup() throws IOException { + sender = new MockCollectingSender(); + proc = new MockPutSyslog(sender); + runner = TestRunners.newTestRunner(proc); + runner.setProperty(PutSyslog.HOSTNAME, "localhost"); + runner.setProperty(PutSyslog.PORT, "12345"); + } + + @Test + public void testValidMessageStaticPropertiesUdp() { + final String pri = "34"; + final String version = "1"; + final String stamp = "2003-10-11T22:14:15.003Z"; + final String host = "mymachine.example.com"; + final String body = "su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8"; + + final String expectedMessage = "<" + pri + ">" + version + " " + stamp + " " + host + " " + body; + + runner.setProperty(PutSyslog.MSG_PRIORITY, pri); + runner.setProperty(PutSyslog.MSG_VERSION, version); + runner.setProperty(PutSyslog.MSG_TIMESTAMP, stamp); + runner.setProperty(PutSyslog.MSG_HOSTNAME, host); + runner.setProperty(PutSyslog.MSG_BODY, body); + + runner.enqueue("incoming data".getBytes(Charset.forName("UTF-8"))); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutSyslog.REL_SUCCESS, 1); + Assert.assertEquals(1, sender.messages.size()); + Assert.assertEquals(expectedMessage, sender.messages.get(0)); + } + + @Test + public void testValidMessageStaticPropertiesTcp() { + final String pri = "34"; + final String version = "1"; + final String stamp = "2003-10-11T22:14:15.003Z"; + final String host = "mymachine.example.com"; + final String body = "su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8"; + + final String expectedMessage = "<" + pri + ">" + version + " " + stamp + " " + host + " " + body; + + runner.setProperty(PutSyslog.PROTOCOL, PutSyslog.TCP_VALUE); + runner.setProperty(PutSyslog.MSG_PRIORITY, pri); + runner.setProperty(PutSyslog.MSG_VERSION, version); + runner.setProperty(PutSyslog.MSG_TIMESTAMP, stamp); + runner.setProperty(PutSyslog.MSG_HOSTNAME, host); + runner.setProperty(PutSyslog.MSG_BODY, body); + + runner.enqueue("incoming data".getBytes(Charset.forName("UTF-8"))); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutSyslog.REL_SUCCESS, 1); + Assert.assertEquals(1, sender.messages.size()); + Assert.assertEquals(expectedMessage, sender.messages.get(0).replace("\n", "")); + } + + @Test + public void testValidMessageStaticPropertiesNoVersion() { + final String pri = "34"; + final String stamp = "2003-10-11T22:14:15.003Z"; + final String host = "mymachine.example.com"; + final String body = "su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8"; + + final String expectedMessage = "<" + pri + ">" + stamp + " " + host + " " + body; + + runner.setProperty(PutSyslog.MSG_PRIORITY, pri); + runner.setProperty(PutSyslog.MSG_TIMESTAMP, stamp); + runner.setProperty(PutSyslog.MSG_HOSTNAME, host); + runner.setProperty(PutSyslog.MSG_BODY, body); + + runner.enqueue("incoming data".getBytes(Charset.forName("UTF-8"))); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutSyslog.REL_SUCCESS, 1); + Assert.assertEquals(1, sender.messages.size()); + Assert.assertEquals(expectedMessage, sender.messages.get(0)); + } + + @Test + public void testValidMessageELProperties() { + final String pri = "34"; + final String stamp = "2003-10-11T22:14:15.003Z"; + final String host = "mymachine.example.com"; + final String body = "su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8"; + + final String expectedMessage = "<" + pri + ">" + stamp + " " + host + " " + body; + + runner.setProperty(PutSyslog.MSG_PRIORITY, "${syslog.priority}"); + runner.setProperty(PutSyslog.MSG_TIMESTAMP, "${syslog.timestamp}"); + runner.setProperty(PutSyslog.MSG_HOSTNAME, "${syslog.hostname}"); + runner.setProperty(PutSyslog.MSG_BODY, "${syslog.body}"); + + final Map<String,String> attributes = new HashMap<>(); + attributes.put("syslog.priority", pri); + attributes.put("syslog.timestamp", stamp); + attributes.put("syslog.hostname", host); + attributes.put("syslog.body", body); + + runner.enqueue("incoming data".getBytes(Charset.forName("UTF-8")), attributes); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutSyslog.REL_SUCCESS, 1); + Assert.assertEquals(1, sender.messages.size()); + Assert.assertEquals(expectedMessage, sender.messages.get(0)); + } + + @Test + public void testInvalidMessageELProperties() { + final String pri = "34"; + final String stamp = "not-a-timestamp"; + final String host = "mymachine.example.com"; + final String body = "su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8"; + + runner.setProperty(PutSyslog.MSG_PRIORITY, "${syslog.priority}"); + runner.setProperty(PutSyslog.MSG_TIMESTAMP, "${syslog.timestamp}"); + runner.setProperty(PutSyslog.MSG_HOSTNAME, "${syslog.hostname}"); + runner.setProperty(PutSyslog.MSG_BODY, "${syslog.body}"); + + final Map<String,String> attributes = new HashMap<>(); + attributes.put("syslog.priority", pri); + attributes.put("syslog.timestamp", stamp); + attributes.put("syslog.hostname", host); + attributes.put("syslog.body", body); + + runner.enqueue("incoming data".getBytes(Charset.forName("UTF-8")), attributes); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutSyslog.REL_INVALID, 1); + Assert.assertEquals(0, sender.messages.size()); + } + + @Test + public void testIOExceptionOnSend() throws IOException { + final String pri = "34"; + final String version = "1"; + final String stamp = "2003-10-11T22:14:15.003Z"; + final String host = "mymachine.example.com"; + final String body = "su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8"; + + proc = new MockPutSyslog(new MockErrorSender()); + runner = TestRunners.newTestRunner(proc); + runner.setProperty(PutSyslog.HOSTNAME, "localhost"); + runner.setProperty(PutSyslog.PORT, "12345"); + runner.setProperty(PutSyslog.MSG_PRIORITY, pri); + runner.setProperty(PutSyslog.MSG_VERSION, version); + runner.setProperty(PutSyslog.MSG_TIMESTAMP, stamp); + runner.setProperty(PutSyslog.MSG_HOSTNAME, host); + runner.setProperty(PutSyslog.MSG_BODY, body); + + runner.enqueue("incoming data".getBytes(Charset.forName("UTF-8"))); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutSyslog.REL_FAILURE, 1); + Assert.assertEquals(0, sender.messages.size()); + } + + @Test + public void testIOExceptionCreatingConnection() throws IOException { + final String pri = "34"; + final String version = "1"; + final String stamp = "2003-10-11T22:14:15.003Z"; + final String host = "mymachine.example.com"; + final String body = "su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8"; + + Processor proc = new MockCreationErrorPutSyslog(new MockErrorSender(), 1); + runner = TestRunners.newTestRunner(proc); + runner.setProperty(PutSyslog.HOSTNAME, "localhost"); + runner.setProperty(PutSyslog.PORT, "12345"); + runner.setProperty(PutSyslog.BATCH_SIZE, "1"); + runner.setProperty(PutSyslog.MSG_PRIORITY, pri); + runner.setProperty(PutSyslog.MSG_VERSION, version); + runner.setProperty(PutSyslog.MSG_TIMESTAMP, stamp); + runner.setProperty(PutSyslog.MSG_HOSTNAME, host); + runner.setProperty(PutSyslog.MSG_BODY, body); + + // the first run will throw IOException when calling send so the connection won't be re-qeued + // the second run will try to create a new connection but throw an exception which should be caught and route files to failure + runner.enqueue("incoming data".getBytes(Charset.forName("UTF-8"))); + runner.enqueue("incoming data".getBytes(Charset.forName("UTF-8"))); + runner.run(2); + + runner.assertAllFlowFilesTransferred(PutSyslog.REL_FAILURE, 2); + Assert.assertEquals(0, sender.messages.size()); + } + + @Test + public void testLargeMessageFailure() { + final String pri = "34"; + final String stamp = "2015-10-15T22:14:15.003Z"; + final String host = "mymachine.example.com"; + + final StringBuilder bodyBuilder = new StringBuilder(4096); + for (int i=0; i < 4096; i++) { + bodyBuilder.append("a"); + } + + runner.setProperty(PutSyslog.MSG_PRIORITY, "${syslog.priority}"); + runner.setProperty(PutSyslog.MSG_TIMESTAMP, "${syslog.timestamp}"); + runner.setProperty(PutSyslog.MSG_HOSTNAME, "${syslog.hostname}"); + runner.setProperty(PutSyslog.MSG_BODY, "${syslog.body}"); + + final Map<String,String> attributes = new HashMap<>(); + attributes.put("syslog.priority", pri); + attributes.put("syslog.timestamp", stamp); + attributes.put("syslog.hostname", host); + attributes.put("syslog.body", bodyBuilder.toString()); + + runner.enqueue("incoming data".getBytes(Charset.forName("UTF-8")), attributes); + runner.run(); + + // should have dynamically created a larger buffer + runner.assertAllFlowFilesTransferred(PutSyslog.REL_SUCCESS, 1); + Assert.assertEquals(1, sender.messages.size()); + } + + @Test + public void testNoIncomingData() { + runner.setProperty(PutSyslog.MSG_PRIORITY, "10"); + runner.setProperty(PutSyslog.MSG_VERSION, "1"); + runner.setProperty(PutSyslog.MSG_TIMESTAMP, "2003-10-11T22:14:15.003Z"); + runner.setProperty(PutSyslog.MSG_HOSTNAME, "localhost"); + runner.setProperty(PutSyslog.MSG_BODY, "test"); + + // queue one file but run several times to test no incoming data + runner.enqueue("incoming data".getBytes(Charset.forName("UTF-8"))); + runner.run(5); + + runner.assertAllFlowFilesTransferred(PutSyslog.REL_SUCCESS, 1); + } + + @Test + public void testBatchingFlowFiles() { + runner.setProperty(PutSyslog.BATCH_SIZE, "10"); + runner.setProperty(PutSyslog.MSG_PRIORITY, "${syslog.priority}"); + runner.setProperty(PutSyslog.MSG_TIMESTAMP, "${syslog.timestamp}"); + runner.setProperty(PutSyslog.MSG_HOSTNAME, "${syslog.hostname}"); + runner.setProperty(PutSyslog.MSG_BODY, "${syslog.body}"); + + final Map<String,String> attributes = new HashMap<>(); + attributes.put("syslog.priority", "10"); + attributes.put("syslog.timestamp", "2015-10-11T22:14:15.003Z"); + attributes.put("syslog.hostname", "my.host.name"); + attributes.put("syslog.body", "blah blah blah"); + + for (int i=0; i < 15; i++) { + runner.enqueue("incoming data".getBytes(Charset.forName("UTF-8")), attributes); + } + + runner.run(); + runner.assertAllFlowFilesTransferred(PutSyslog.REL_SUCCESS, 10); + Assert.assertEquals(10, sender.messages.size()); + + runner.run(); + runner.assertAllFlowFilesTransferred(PutSyslog.REL_SUCCESS, 15); + Assert.assertEquals(15, sender.messages.size()); + } + + // Mock processor to return a MockCollectingSender + static class MockPutSyslog extends PutSyslog { + + ChannelSender mockSender; + + public MockPutSyslog(ChannelSender sender) { + this.mockSender = sender; + } + + @Override + protected ChannelSender createSender(String protocol, String host, int port, Charset charset, BlockingQueue<ByteBuffer> bufferPool) throws IOException { + return mockSender; + } + } + + // Mock processor to test exception when creating new senders + static class MockCreationErrorPutSyslog extends PutSyslog { + + int numSendersCreated; + int numSendersAllowed; + ChannelSender mockSender; + + public MockCreationErrorPutSyslog(ChannelSender sender, int numSendersAllowed) { + this.mockSender = sender; + this.numSendersAllowed = numSendersAllowed; + } + + @Override + protected ChannelSender createSender(String protocol, String host, int port, Charset charset, BlockingQueue<ByteBuffer> bufferPool) throws IOException { + if (numSendersCreated >= numSendersAllowed) { + throw new IOException("too many senders"); + } + numSendersCreated++; + return mockSender; + } + } + + // Mock sender that saves any messages passed to send() + static class MockCollectingSender extends PutSyslog.ChannelSender { + + List<String> messages = new ArrayList<>(); + + public MockCollectingSender() throws IOException { + super("myhost", 0, new LinkedBlockingQueue<ByteBuffer>(1), Charset.forName("UTF-8")); + this.bufferPool.offer(ByteBuffer.allocate(1024)); + } + + @Override + public void send(String message) throws IOException { + messages.add(message); + super.send(message); + } + + @Override + void write(ByteBuffer buffer) throws IOException { + + } + + @Override + boolean isConnected() { + return true; + } + + @Override + void close() { + + } + } + + // Mock sender that throws IOException on calls to write() or send() + static class MockErrorSender extends PutSyslog.ChannelSender { + + public MockErrorSender() throws IOException { + super(null, 0, null, null); + } + + @Override + public void send(String message) throws IOException { + throw new IOException("error"); + } + + @Override + void write(ByteBuffer buffer) throws IOException { + throw new IOException("error"); + } + + @Override + boolean isConnected() { + return false; + } + + @Override + void close() { + + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/9c542432/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestSyslogParser.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestSyslogParser.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestSyslogParser.java new file mode 100644 index 0000000..d1b1bbf --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestSyslogParser.java @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.util; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.List; + +public class TestSyslogParser { + + static final Charset CHARSET = Charset.forName("UTF-8"); + + private SyslogParser parser; + + @Before + public void setup() { + parser = new SyslogParser(CHARSET); + } + + @Test + public void testRFC3164SingleDigitDay() { + final String pri = "10"; + final String stamp = "Oct 1 13:14:04"; + final String host = "my.host.com"; + final String body = "some body message"; + final String message = "<" + pri + ">" + stamp + " " + host + " " + body; + + final byte[] bytes = message.getBytes(CHARSET); + final ByteBuffer buffer = ByteBuffer.allocate(bytes.length); + buffer.clear(); + buffer.put(bytes); + + final SyslogEvent event = parser.parseEvent(buffer); + Assert.assertNotNull(event); + Assert.assertEquals(pri, event.getPriority()); + Assert.assertEquals("2", event.getSeverity()); + Assert.assertEquals("1", event.getFacility()); + Assert.assertNull(event.getVersion()); + Assert.assertEquals(stamp, event.getTimeStamp()); + Assert.assertEquals(host, event.getHostName()); + Assert.assertEquals(body, event.getMsgBody()); + Assert.assertEquals(message, event.getFullMessage()); + Assert.assertTrue(event.isValid()); + } + + @Test + public void testRFC3164DoubleDigitDay() { + final String pri = "31"; + final String stamp = "Oct 13 14:14:43"; + final String host = "localhost"; + final String body = "AppleCameraAssistant[470]: DeviceMessageNotificationCallback: kIOPMMessageSystemPowerEventOccurred: 0x00000000"; + final String message = "<" + pri + ">" + stamp + " " + host + " " + body; + + final byte[] bytes = message.getBytes(CHARSET); + final ByteBuffer buffer = ByteBuffer.allocate(bytes.length); + buffer.clear(); + buffer.put(bytes); + + final SyslogEvent event = parser.parseEvent(buffer); + Assert.assertNotNull(event); + Assert.assertEquals(pri, event.getPriority()); + Assert.assertEquals("7", event.getSeverity()); + Assert.assertEquals("3", event.getFacility()); + Assert.assertNull(event.getVersion()); + Assert.assertEquals(stamp, event.getTimeStamp()); + Assert.assertEquals(host, event.getHostName()); + Assert.assertEquals(body, event.getMsgBody()); + Assert.assertEquals(message, event.getFullMessage()); + Assert.assertTrue(event.isValid()); + } + + @Test + public void testRFC3164WithVersion() { + final String pri = "31"; + final String version = "1"; + final String stamp = "Oct 13 14:14:43"; + final String host = "localhost"; + final String body = "AppleCameraAssistant[470]: DeviceMessageNotificationCallback: kIOPMMessageSystemPowerEventOccurred: 0x00000000"; + final String message = "<" + pri + ">" + version + " " + stamp + " " + host + " " + body; + + final byte[] bytes = message.getBytes(CHARSET); + final ByteBuffer buffer = ByteBuffer.allocate(bytes.length); + buffer.clear(); + buffer.put(bytes); + + final SyslogEvent event = parser.parseEvent(buffer); + Assert.assertNotNull(event); + Assert.assertEquals(pri, event.getPriority()); + Assert.assertEquals("7", event.getSeverity()); + Assert.assertEquals("3", event.getFacility()); + Assert.assertEquals(version, event.getVersion()); + Assert.assertEquals(stamp, event.getTimeStamp()); + Assert.assertEquals(host, event.getHostName()); + Assert.assertEquals(body, event.getMsgBody()); + Assert.assertEquals(message, event.getFullMessage()); + Assert.assertTrue(event.isValid()); + } + + @Test + public void testRFC5424WithVersion() { + final String pri = "34"; + final String version = "1"; + final String stamp = "2003-10-11T22:14:15.003Z"; + final String host = "mymachine.example.com"; + final String body = "su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8"; + + final String message = "<" + pri + ">" + version + " " + stamp + " " + host + " " + body; + + final byte[] bytes = message.getBytes(CHARSET); + final ByteBuffer buffer = ByteBuffer.allocate(bytes.length); + buffer.clear(); + buffer.put(bytes); + + final SyslogEvent event = parser.parseEvent(buffer); + Assert.assertNotNull(event); + Assert.assertEquals(pri, event.getPriority()); + Assert.assertEquals("2", event.getSeverity()); + Assert.assertEquals("4", event.getFacility()); + Assert.assertEquals(version, event.getVersion()); + Assert.assertEquals(stamp, event.getTimeStamp()); + Assert.assertEquals(host, event.getHostName()); + Assert.assertEquals(body, event.getMsgBody()); + Assert.assertEquals(message, event.getFullMessage()); + Assert.assertTrue(event.isValid()); + } + + @Test + public void testRFC5424WithoutVersion() { + final String pri = "34"; + final String stamp = "2003-10-11T22:14:15.003Z"; + final String host = "mymachine.example.com"; + final String body = "su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8"; + + final String message = "<" + pri + ">" + stamp + " " + host + " " + body; + + final byte[] bytes = message.getBytes(CHARSET); + final ByteBuffer buffer = ByteBuffer.allocate(bytes.length); + buffer.clear(); + buffer.put(bytes); + + final SyslogEvent event = parser.parseEvent(buffer); + Assert.assertNotNull(event); + Assert.assertEquals(pri, event.getPriority()); + Assert.assertEquals("2", event.getSeverity()); + Assert.assertEquals("4", event.getFacility()); + Assert.assertNull(event.getVersion()); + Assert.assertEquals(stamp, event.getTimeStamp()); + Assert.assertEquals(host, event.getHostName()); + Assert.assertEquals(body, event.getMsgBody()); + Assert.assertEquals(message, event.getFullMessage()); + Assert.assertTrue(event.isValid()); + } + + @Test + public void testTrailingNewLine() { + final String message = "<31>Oct 13 15:43:23 localhost.home some message\n"; + + final byte[] bytes = message.getBytes(CHARSET); + final ByteBuffer buffer = ByteBuffer.allocate(bytes.length); + buffer.clear(); + buffer.put(bytes); + + final SyslogEvent event = parser.parseEvent(buffer); + Assert.assertNotNull(event); + Assert.assertTrue(event.isValid()); + } + + @Test + public void testVariety() { + final List<String> messages = new ArrayList<>(); + + // supported examples from RFC 3164 + messages.add("<34>Oct 11 22:14:15 mymachine su: 'su root' failed for " + + "lonvick on /dev/pts/8"); + messages.add("<13>Feb 5 17:32:18 10.0.0.99 Use the BFG!"); + messages.add("<165>Aug 24 05:34:00 CST 1987 mymachine myproc[10]: %% " + + "It's time to make the do-nuts. %% Ingredients: Mix=OK, Jelly=OK # " + + "Devices: Mixer=OK, Jelly_Injector=OK, Frier=OK # Transport: " + + "Conveyer1=OK, Conveyer2=OK # %%"); + messages.add("<0>Oct 22 10:52:12 scapegoat 1990 Oct 22 10:52:01 TZ-6 " + + "scapegoat.dmz.example.org 10.1.2.3 sched[0]: That's All Folks!"); + + // supported examples from RFC 5424 + messages.add("<34>1 2003-10-11T22:14:15.003Z mymachine.example.com su - " + + "ID47 - BOM'su root' failed for lonvick on /dev/pts/8"); + messages.add("<165>1 2003-08-24T05:14:15.000003-07:00 192.0.2.1 myproc " + + "8710 - - %% It's time to make the do-nuts."); + + // non-standard (but common) messages (RFC3339 dates, no version digit) + messages.add("<13>2003-08-24T05:14:15Z localhost snarf?"); + messages.add("<13>2012-08-16T14:34:03-08:00 127.0.0.1 test shnap!"); + + for (final String message : messages) { + final byte[] bytes = message.getBytes(CHARSET); + final ByteBuffer buffer = ByteBuffer.allocate(bytes.length); + buffer.clear(); + buffer.put(bytes); + + final SyslogEvent event = parser.parseEvent(buffer); + Assert.assertTrue(event.isValid()); + } + } + + @Test + public void testInvalidPriority() { + final String message = "10 Oct 13 14:14:43 localhost some body of the message"; + + final byte[] bytes = message.getBytes(CHARSET); + final ByteBuffer buffer = ByteBuffer.allocate(bytes.length); + buffer.clear(); + buffer.put(bytes); + + final SyslogEvent event = parser.parseEvent(buffer); + Assert.assertNotNull(event); + Assert.assertFalse(event.isValid()); + Assert.assertEquals(message, event.getFullMessage()); + } + + @Test + public void testParseWithSender() { + final String sender = "127.0.0.1"; + final String message = "<31>Oct 13 15:43:23 localhost.home some message\n"; + + final byte[] bytes = message.getBytes(CHARSET); + final ByteBuffer buffer = ByteBuffer.allocate(bytes.length); + buffer.clear(); + buffer.put(bytes); + + final SyslogEvent event = parser.parseEvent(buffer, sender); + Assert.assertNotNull(event); + Assert.assertTrue(event.isValid()); + Assert.assertEquals(sender, event.getSender()); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/9c542432/nifi-nar-bundles/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml index 841818a..d48b755 100644 --- a/nifi-nar-bundles/pom.xml +++ b/nifi-nar-bundles/pom.xml @@ -46,7 +46,7 @@ <module>nifi-image-bundle</module> <module>nifi-avro-bundle</module> <module>nifi-couchbase-bundle</module> - </modules> + </modules> <dependencyManagement> <dependencies> <dependency> @@ -131,4 +131,4 @@ </dependency> </dependencies> </dependencyManagement> -</project> +</project> \ No newline at end of file
