Repository: nifi
Updated Branches:
  refs/heads/master 49a62448c -> ef5bac207


NIFI-3738 Fixed NPE when ListenSyslog UDP datagram has zero length. Added 
default constructor to SyslogParser to allow map coercion for test. Added unit 
test.

NIFI-3738 Fixed NPE when ParseSyslog UDP datagram has zero length.
Added unit test.

NIFI-3738 Added licenses to new unit tests.

This closes #1694.

Signed-off-by: Bryan Bende <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/ef5bac20
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/ef5bac20
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/ef5bac20

Branch: refs/heads/master
Commit: ef5bac207ee52d493c5cfaa5908cf3d839c05e10
Parents: 49a6244
Author: Andy LoPresto <[email protected]>
Authored: Tue Apr 25 12:10:16 2017 -0700
Committer: Bryan Bende <[email protected]>
Committed: Tue Apr 25 16:38:46 2017 -0400

----------------------------------------------------------------------
 .../nifi/processors/standard/ListenSyslog.java  |  49 ++++----
 .../nifi/processors/standard/ParseSyslog.java   |  12 +-
 .../standard/syslog/SyslogParser.java           |   8 ++
 .../standard/ListenSyslogGroovyTest.groovy      | 111 +++++++++++++++++++
 .../standard/ParseSyslogGroovyTest.groovy       |  83 ++++++++++++++
 5 files changed, 233 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/ef5bac20/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
index 38668e5..76d5cbf 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
@@ -18,6 +18,26 @@ package org.apache.nifi.processors.standard;
 
 import static 
org.apache.nifi.processor.util.listen.ListenerProperties.NETWORK_INTF_NAME;
 
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SocketChannel;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import javax.net.ssl.SSLContext;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
@@ -55,27 +75,6 @@ import 
org.apache.nifi.processors.standard.syslog.SyslogEvent;
 import org.apache.nifi.processors.standard.syslog.SyslogParser;
 import org.apache.nifi.ssl.SSLContextService;
 
-import javax.net.ssl.SSLContext;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.InetAddress;
-import java.net.NetworkInterface;
-import java.nio.ByteBuffer;
-import java.nio.channels.SelectableChannel;
-import java.nio.channels.SocketChannel;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
 @SupportsBatching
 @InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
 @Tags({"syslog", "listen", "udp", "tcp", "logs"})
@@ -410,11 +409,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
             }
 
             final String sender = rawSyslogEvent.getSender();
-            FlowFile flowFile = flowFilePerSender.get(sender);
-            if (flowFile == null) {
-                flowFile = session.create();
-                flowFilePerSender.put(sender, flowFile);
-            }
+            FlowFile flowFile = flowFilePerSender.computeIfAbsent(sender, k -> 
session.create());
 
             if (shouldParse) {
                 boolean valid = true;
@@ -428,7 +423,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
                 // If the event is invalid, route it to 'invalid' and then 
stop.
                 // We create a separate FlowFile for this case instead of 
using 'flowFile',
                 // because the 'flowFile' object may already have data written 
to it.
-                if (!valid || !event.isValid()) {
+                if (!valid || event == null || !event.isValid()) {
                     FlowFile invalidFlowFile = session.create();
                     invalidFlowFile = 
session.putAllAttributes(invalidFlowFile, defaultAttributes);
                     if (sender != null) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/ef5bac20/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ParseSyslog.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ParseSyslog.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ParseSyslog.java
index 90fa816..ae08b22 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ParseSyslog.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ParseSyslog.java
@@ -26,7 +26,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-
 import org.apache.nifi.annotation.behavior.EventDriven;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@@ -86,6 +85,8 @@ public class ParseSyslog extends AbstractProcessor {
         .description("Any FlowFile that is successfully parsed as a Syslog 
message will be to this Relationship.")
         .build();
 
+    private SyslogParser parser;
+
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@@ -110,7 +111,12 @@ public class ParseSyslog extends AbstractProcessor {
         }
 
         final String charsetName = context.getProperty(CHARSET).getValue();
-        final SyslogParser parser = new 
SyslogParser(Charset.forName(charsetName));
+
+        // If the parser already exists and uses the same charset, it does not 
need to be re-initialized
+        if (parser == null || !parser.getCharsetName().equals(charsetName)) {
+            parser = new SyslogParser(Charset.forName(charsetName));
+        }
+
         final byte[] buffer = new byte[(int) flowFile.getSize()];
         session.read(flowFile, new InputStreamCallback() {
             @Override
@@ -128,7 +134,7 @@ public class ParseSyslog extends AbstractProcessor {
             return;
         }
 
-        if (!event.isValid()) {
+        if (event == null || !event.isValid()) {
             getLogger().error("Failed to parse {} as a Syslog message: it does 
not conform to any of the RFC formats supported; routing to failure", new 
Object[] {flowFile});
             session.transfer(flowFile, REL_FAILURE);
             return;

http://git-wip-us.apache.org/repos/asf/nifi/blob/ef5bac20/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/syslog/SyslogParser.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/syslog/SyslogParser.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/syslog/SyslogParser.java
index b5dbf22..52caedb 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/syslog/SyslogParser.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/syslog/SyslogParser.java
@@ -18,6 +18,7 @@ package org.apache.nifi.processors.standard.syslog;
 
 import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -71,6 +72,10 @@ public class SyslogParser {
 
     private Charset charset;
 
+    public SyslogParser() {
+        this(StandardCharsets.UTF_8);
+    }
+
     public SyslogParser(final Charset charset) {
         this.charset = charset;
     }
@@ -162,4 +167,7 @@ public class SyslogParser {
         return builder.build();
     }
 
+    public String getCharsetName() {
+        return charset == null ? StandardCharsets.UTF_8.name() : 
charset.name();
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/ef5bac20/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/ListenSyslogGroovyTest.groovy
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/ListenSyslogGroovyTest.groovy
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/ListenSyslogGroovyTest.groovy
new file mode 100644
index 0000000..1c6b4f8
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/ListenSyslogGroovyTest.groovy
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard
+
+import org.apache.nifi.processor.ProcessContext
+import org.apache.nifi.processor.ProcessSessionFactory
+import org.apache.nifi.processors.standard.syslog.SyslogParser
+import org.apache.nifi.util.TestRunner
+import org.apache.nifi.util.TestRunners
+import org.bouncycastle.util.encoders.Hex
+import org.junit.After
+import org.junit.Assert
+import org.junit.Before
+import org.junit.BeforeClass
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.junit.runners.JUnit4
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
+
+@RunWith(JUnit4.class)
+class ListenSyslogGroovyTest extends GroovyTestCase {
+    private static final Logger logger = 
LoggerFactory.getLogger(ListenSyslogGroovyTest.class)
+
+    static final String ZERO_LENGTH_MESSAGE = "     \n"
+
+    @BeforeClass
+    static void setUpOnce() throws Exception {
+        logger.metaClass.methodMissing = { String name, args ->
+            logger.info("[${name?.toUpperCase()}] ${(args as List).join(" ")}")
+        }
+    }
+
+    @Before
+    void setUp() throws Exception {
+    }
+
+    @After
+    void tearDown() throws Exception {
+    }
+
+    @Test
+    void testShouldHandleZeroLengthUDP() throws Exception {
+        // Arrange
+        final ListenSyslog proc = new ListenSyslog()
+        final TestRunner runner = TestRunners.newTestRunner(proc)
+        runner.setProperty(ListenSyslog.PROTOCOL, 
ListenSyslog.TCP_VALUE.getValue())
+        runner.setProperty(ListenSyslog.PORT, "0")
+
+        // schedule to start listening on a random port
+        final ProcessSessionFactory processSessionFactory = 
runner.getProcessSessionFactory()
+        final ProcessContext context = runner.getProcessContext()
+        proc.onScheduled(context)
+
+        // Inject a SyslogParser which will always return null
+        def nullEventParser = [parseEvent: { byte[] bytes, String sender ->
+            logger.mock("Regardless of input bytes: 
[${Hex.toHexString(bytes)}] and sender: [${sender}], this parser will return 
null")
+            return null
+        }] as SyslogParser
+        proc.parser = nullEventParser
+
+        final int numMessages = 10
+        final int port = proc.getPort()
+        Assert.assertTrue(port > 0)
+
+        // write some TCP messages to the port in the background
+        final Thread sender = new Thread(new 
TestListenSyslog.SingleConnectionSocketSender(port, numMessages, 100, 
ZERO_LENGTH_MESSAGE))
+        sender.setDaemon(true)
+        sender.start()
+
+        // Act
+
+        // call onTrigger until we read all messages, or 30 seconds passed
+        try {
+            int numFailed = 0
+            long timeout = System.currentTimeMillis() + 30000
+
+            while (numFailed < numMessages && System.currentTimeMillis() < 
timeout) {
+                Thread.sleep(50)
+                proc.onTrigger(context, processSessionFactory)
+                numFailed = 
runner.getFlowFilesForRelationship(ListenSyslog.REL_INVALID).size()
+            }
+
+            int numSuccess = 
runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).size()
+            logger.info("Transferred " + numSuccess + " to SUCCESS and " + 
numFailed + " to INVALID")
+
+            // Assert
+
+            // all messages should be transferred to invalid
+            Assert.assertEquals("Did not process all the messages", 
numMessages, numFailed)
+
+        } finally {
+            // unschedule to close connections
+            proc.onUnscheduled()
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/ef5bac20/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/ParseSyslogGroovyTest.groovy
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/ParseSyslogGroovyTest.groovy
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/ParseSyslogGroovyTest.groovy
new file mode 100644
index 0000000..208eaee
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/ParseSyslogGroovyTest.groovy
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard
+
+import org.apache.nifi.processors.standard.syslog.SyslogParser
+import org.apache.nifi.util.TestRunner
+import org.apache.nifi.util.TestRunners
+import org.bouncycastle.util.encoders.Hex
+import org.junit.After
+import org.junit.Assert
+import org.junit.Before
+import org.junit.BeforeClass
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.junit.runners.JUnit4
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
+
+@RunWith(JUnit4.class)
+class ParseSyslogGroovyTest extends GroovyTestCase {
+    private static final Logger logger = 
LoggerFactory.getLogger(ParseSyslogGroovyTest.class)
+
+    @BeforeClass
+    static void setUpOnce() throws Exception {
+        logger.metaClass.methodMissing = { String name, args ->
+            logger.info("[${name?.toUpperCase()}] ${(args as List).join(" ")}")
+        }
+    }
+
+    @Before
+    void setUp() throws Exception {
+    }
+
+    @After
+    void tearDown() throws Exception {
+    }
+
+    @Test
+    void testShouldHandleZeroLengthUDP() throws Exception {
+        // Arrange
+        final ParseSyslog proc = new ParseSyslog()
+        final TestRunner runner = TestRunners.newTestRunner(proc)
+        runner.setProperty(ParseSyslog.CHARSET, 
ParseSyslog.CHARSET.defaultValue)
+
+        // Inject a SyslogParser which will always return null
+        def nullEventParser = [parseEvent: { byte[] bytes, String sender ->
+            logger.mock("Regardless of input bytes: 
[${Hex.toHexString(bytes)}] and sender: [${sender}], this parser will return 
null")
+            return null
+        }] as SyslogParser
+        proc.parser = nullEventParser
+
+        final int numMessages = 10
+
+        // Act
+        numMessages.times {
+            runner.enqueue("Doesn't matter what is enqueued here")
+        }
+        runner.run(numMessages)
+
+        int numFailed = 
runner.getFlowFilesForRelationship(ParseSyslog.REL_FAILURE).size()
+        int numSuccess = 
runner.getFlowFilesForRelationship(ParseSyslog.REL_SUCCESS).size()
+        logger.info("Transferred " + numSuccess + " to SUCCESS and " + 
numFailed + " to FAILURE")
+
+        // Assert
+
+        // all messages should be transferred to invalid
+        Assert.assertEquals("Did not process all the messages", numMessages, 
numFailed)
+    }
+}

Reply via email to