NIFI-274 Adding syslog.port to ListenSyslog attributes, logging at warn level 
when rejecting tcp connections


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d328ac48
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d328ac48
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d328ac48

Branch: refs/heads/NIFI-274
Commit: d328ac481168a84675d8bec3477277ca57e81e52
Parents: 6daaad6
Author: Bryan Bende <[email protected]>
Authored: Wed Nov 4 11:21:16 2015 -0500
Committer: Bryan Bende <[email protected]>
Committed: Wed Nov 4 11:21:16 2015 -0500

----------------------------------------------------------------------
 .../nifi/processors/standard/AbstractSyslogProcessor.java |  3 ++-
 .../org/apache/nifi/processors/standard/ListenSyslog.java |  7 +++++--
 .../apache/nifi/processors/standard/TestListenSyslog.java | 10 ++++++----
 3 files changed, 13 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/d328ac48/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java
index ac5586d..fea01bd 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java
@@ -65,7 +65,8 @@ public abstract class AbstractSyslogProcessor extends 
AbstractProcessor {
         SENDER("syslog.sender"),
         BODY("syslog.body"),
         VALID("syslog.valid"),
-        PROTOCOL("syslog.protocol");
+        PROTOCOL("syslog.protocol"),
+        PORT("syslog.pprt");
 
         private String key;
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/d328ac48/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
index ff7be0d..5f76beb 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
@@ -90,6 +90,7 @@ import org.apache.nifi.stream.io.ByteArrayOutputStream;
                     @WritesAttribute(attribute="syslog.valid", description="An 
indicator of whether this message matched the expected formats. " +
                             "If this value is false, the other attributes will 
be empty and only the original message will be available in the content."),
                     @WritesAttribute(attribute="syslog.protocol", 
description="The protocol over which the Syslog message was received."),
+                    @WritesAttribute(attribute="syslog.port", description="The 
port over which the Syslog message was received."),
                     @WritesAttribute(attribute="mime.type", description="The 
mime.type of the FlowFile which will be text/plain for Syslog messages.")})
 public class ListenSyslog extends AbstractSyslogProcessor {
 
@@ -144,8 +145,8 @@ public class ListenSyslog extends AbstractSyslogProcessor {
         descriptors.add(PORT);
         descriptors.add(RECV_BUFFER_SIZE);
         descriptors.add(MAX_SOCKET_BUFFER_SIZE);
-        descriptors.add(CHARSET);
         descriptors.add(MAX_CONNECTIONS);
+        descriptors.add(CHARSET);
         this.descriptors = Collections.unmodifiableList(descriptors);
 
         final Set<Relationship> relationships = new HashSet<>();
@@ -254,6 +255,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
         attributes.put(SyslogAttributes.BODY.key(), event.getMsgBody());
         attributes.put(SyslogAttributes.VALID.key(), 
String.valueOf(event.isValid()));
         attributes.put(SyslogAttributes.PROTOCOL.key(), 
context.getProperty(PROTOCOL).getValue());
+        attributes.put(SyslogAttributes.PORT.key(), 
context.getProperty(PORT).getValue());
         attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
 
         FlowFile flowFile = session.create();
@@ -457,7 +459,8 @@ public class ListenSyslog extends AbstractSyslogProcessor {
                                 // Check for available connections
                                 if (currentConnections.incrementAndGet() > 
maxConnections){
                                     currentConnections.decrementAndGet();
-                                    logger.info("Rejecting connection from {} 
because max connections has been met", new Object[]{ 
socketChannel.getRemoteAddress().toString() });
+                                    logger.warn("Rejecting connection from {} 
because max connections has been met",
+                                            new Object[]{ 
socketChannel.getRemoteAddress().toString() });
                                     IOUtils.closeQuietly(socketChannel);
                                     continue;
                                 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/d328ac48/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
index 345e425..9795545 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
@@ -89,7 +89,7 @@ public class TestListenSyslog {
             Assert.assertEquals("Did not process all the datagrams", 
numMessages, numTransfered);
 
             MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0);
-            checkFlowFile(flowFile);
+            checkFlowFile(flowFile, 0, ListenSyslog.UDP_VALUE.getValue());
 
         } finally {
             // unschedule to close connections
@@ -131,7 +131,7 @@ public class TestListenSyslog {
             Assert.assertEquals("Did not process all the messages", 
numMessages, numTransfered);
 
             MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0);
-            checkFlowFile(flowFile);
+            checkFlowFile(flowFile, 0, ListenSyslog.TCP_VALUE.getValue());
         } finally {
             // unschedule to close connections
             proc.onUnscheduled();
@@ -173,7 +173,7 @@ public class TestListenSyslog {
             Assert.assertEquals("Did not process all the messages", 
numMessages, numTransfered);
 
             MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0);
-            checkFlowFile(flowFile);
+            checkFlowFile(flowFile, 0, ListenSyslog.TCP_VALUE.getValue());
         } finally {
             // unschedule to close connections
             proc.onUnscheduled();
@@ -244,7 +244,7 @@ public class TestListenSyslog {
     }
 
 
-    private void checkFlowFile(MockFlowFile flowFile) {
+    private void checkFlowFile(MockFlowFile flowFile, int port, String 
protocol) {
         flowFile.assertContentEquals(VALID_MESSAGE);
         Assert.assertEquals(PRI, 
flowFile.getAttribute(ListenSyslog.SyslogAttributes.PRIORITY.key()));
         Assert.assertEquals(SEV, 
flowFile.getAttribute(ListenSyslog.SyslogAttributes.SEVERITY.key()));
@@ -253,6 +253,8 @@ public class TestListenSyslog {
         Assert.assertEquals(HOST, 
flowFile.getAttribute(ListenSyslog.SyslogAttributes.HOSTNAME.key()));
         Assert.assertEquals(BODY, 
flowFile.getAttribute(ListenSyslog.SyslogAttributes.BODY.key()));
         Assert.assertEquals("true", 
flowFile.getAttribute(ListenSyslog.SyslogAttributes.VALID.key()));
+        Assert.assertEquals(String.valueOf(port), 
flowFile.getAttribute(ListenSyslog.SyslogAttributes.PORT.key()));
+        Assert.assertEquals(protocol, 
flowFile.getAttribute(ListenSyslog.SyslogAttributes.PROTOCOL.key()));
     }
 
     /**

Reply via email to