NIFI-274 Adding syslog.port to ListenSyslog attributes, logging at warn level when rejecting tcp connections
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d328ac48 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d328ac48 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d328ac48 Branch: refs/heads/NIFI-274 Commit: d328ac481168a84675d8bec3477277ca57e81e52 Parents: 6daaad6 Author: Bryan Bende <[email protected]> Authored: Wed Nov 4 11:21:16 2015 -0500 Committer: Bryan Bende <[email protected]> Committed: Wed Nov 4 11:21:16 2015 -0500 ---------------------------------------------------------------------- .../nifi/processors/standard/AbstractSyslogProcessor.java | 3 ++- .../org/apache/nifi/processors/standard/ListenSyslog.java | 7 +++++-- .../apache/nifi/processors/standard/TestListenSyslog.java | 10 ++++++---- 3 files changed, 13 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/d328ac48/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java index ac5586d..fea01bd 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java @@ -65,7 +65,8 @@ public abstract class AbstractSyslogProcessor extends AbstractProcessor { SENDER("syslog.sender"), BODY("syslog.body"), VALID("syslog.valid"), - PROTOCOL("syslog.protocol"); + PROTOCOL("syslog.protocol"), + PORT("syslog.pprt"); private String key; http://git-wip-us.apache.org/repos/asf/nifi/blob/d328ac48/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java index ff7be0d..5f76beb 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java @@ -90,6 +90,7 @@ import org.apache.nifi.stream.io.ByteArrayOutputStream; @WritesAttribute(attribute="syslog.valid", description="An indicator of whether this message matched the expected formats. " + "If this value is false, the other attributes will be empty and only the original message will be available in the content."), @WritesAttribute(attribute="syslog.protocol", description="The protocol over which the Syslog message was received."), + @WritesAttribute(attribute="syslog.port", description="The port over which the Syslog message was received."), @WritesAttribute(attribute="mime.type", description="The mime.type of the FlowFile which will be text/plain for Syslog messages.")}) public class ListenSyslog extends AbstractSyslogProcessor { @@ -144,8 +145,8 @@ public class ListenSyslog extends AbstractSyslogProcessor { descriptors.add(PORT); descriptors.add(RECV_BUFFER_SIZE); descriptors.add(MAX_SOCKET_BUFFER_SIZE); - descriptors.add(CHARSET); descriptors.add(MAX_CONNECTIONS); + descriptors.add(CHARSET); this.descriptors = Collections.unmodifiableList(descriptors); final Set<Relationship> relationships = new HashSet<>(); @@ -254,6 +255,7 @@ public class ListenSyslog extends AbstractSyslogProcessor { attributes.put(SyslogAttributes.BODY.key(), event.getMsgBody()); attributes.put(SyslogAttributes.VALID.key(), String.valueOf(event.isValid())); attributes.put(SyslogAttributes.PROTOCOL.key(), context.getProperty(PROTOCOL).getValue()); + attributes.put(SyslogAttributes.PORT.key(), context.getProperty(PORT).getValue()); attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain"); FlowFile flowFile = session.create(); @@ -457,7 +459,8 @@ public class ListenSyslog extends AbstractSyslogProcessor { // Check for available connections if (currentConnections.incrementAndGet() > maxConnections){ currentConnections.decrementAndGet(); - logger.info("Rejecting connection from {} because max connections has been met", new Object[]{ socketChannel.getRemoteAddress().toString() }); + logger.warn("Rejecting connection from {} because max connections has been met", + new Object[]{ socketChannel.getRemoteAddress().toString() }); IOUtils.closeQuietly(socketChannel); continue; } http://git-wip-us.apache.org/repos/asf/nifi/blob/d328ac48/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java index 345e425..9795545 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java @@ -89,7 +89,7 @@ public class TestListenSyslog { Assert.assertEquals("Did not process all the datagrams", numMessages, numTransfered); MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0); - checkFlowFile(flowFile); + checkFlowFile(flowFile, 0, ListenSyslog.UDP_VALUE.getValue()); } finally { // unschedule to close connections @@ -131,7 +131,7 @@ public class TestListenSyslog { Assert.assertEquals("Did not process all the messages", numMessages, numTransfered); MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0); - checkFlowFile(flowFile); + checkFlowFile(flowFile, 0, ListenSyslog.TCP_VALUE.getValue()); } finally { // unschedule to close connections proc.onUnscheduled(); @@ -173,7 +173,7 @@ public class TestListenSyslog { Assert.assertEquals("Did not process all the messages", numMessages, numTransfered); MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0); - checkFlowFile(flowFile); + checkFlowFile(flowFile, 0, ListenSyslog.TCP_VALUE.getValue()); } finally { // unschedule to close connections proc.onUnscheduled(); @@ -244,7 +244,7 @@ public class TestListenSyslog { } - private void checkFlowFile(MockFlowFile flowFile) { + private void checkFlowFile(MockFlowFile flowFile, int port, String protocol) { flowFile.assertContentEquals(VALID_MESSAGE); Assert.assertEquals(PRI, flowFile.getAttribute(ListenSyslog.SyslogAttributes.PRIORITY.key())); Assert.assertEquals(SEV, flowFile.getAttribute(ListenSyslog.SyslogAttributes.SEVERITY.key())); @@ -253,6 +253,8 @@ public class TestListenSyslog { Assert.assertEquals(HOST, flowFile.getAttribute(ListenSyslog.SyslogAttributes.HOSTNAME.key())); Assert.assertEquals(BODY, flowFile.getAttribute(ListenSyslog.SyslogAttributes.BODY.key())); Assert.assertEquals("true", flowFile.getAttribute(ListenSyslog.SyslogAttributes.VALID.key())); + Assert.assertEquals(String.valueOf(port), flowFile.getAttribute(ListenSyslog.SyslogAttributes.PORT.key())); + Assert.assertEquals(protocol, flowFile.getAttribute(ListenSyslog.SyslogAttributes.PROTOCOL.key())); } /**
