Repository: nifi Updated Branches: refs/heads/NIFI-274 e486f4619 -> 4bdd729dc
NIFI-274 Fixing TestListenSyslog, fixing default buffer size to be bytes, adding syslog.protocol to attributes Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/6daaad67 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/6daaad67 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/6daaad67 Branch: refs/heads/NIFI-274 Commit: 6daaad67fc326433083803ace6dceb481de55f8a Parents: e486f46 Author: Bryan Bende <[email protected]> Authored: Wed Nov 4 09:01:52 2015 -0500 Committer: Bryan Bende <[email protected]> Committed: Wed Nov 4 09:01:52 2015 -0500 ---------------------------------------------------------------------- .../standard/AbstractSyslogProcessor.java | 3 ++- .../nifi/processors/standard/ListenSyslog.java | 16 +++++++++++----- .../nifi/processors/standard/TestListenSyslog.java | 3 +++ 3 files changed, 16 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/6daaad67/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java index f7d5eeb..ac5586d 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java @@ -64,7 +64,8 @@ public abstract class AbstractSyslogProcessor extends AbstractProcessor { HOSTNAME("syslog.hostname"), SENDER("syslog.sender"), BODY("syslog.body"), - VALID("syslog.valid"); + VALID("syslog.valid"), + PROTOCOL("syslog.protocol"); private String key; http://git-wip-us.apache.org/repos/asf/nifi/blob/6daaad67/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java index eafe694..ff7be0d 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java @@ -22,6 +22,7 @@ import java.net.InetSocketAddress; import java.net.SocketAddress; import java.net.StandardSocketOptions; import java.nio.ByteBuffer; +import java.nio.channels.Channel; import java.nio.channels.ClosedByInterruptException; import java.nio.channels.DatagramChannel; import java.nio.channels.SelectionKey; @@ -88,6 +89,7 @@ import org.apache.nifi.stream.io.ByteArrayOutputStream; @WritesAttribute(attribute="syslog.body", description="The body of the Syslog message, everything after the hostname."), @WritesAttribute(attribute="syslog.valid", description="An indicator of whether this message matched the expected formats. " + "If this value is false, the other attributes will be empty and only the original message will be available in the content."), + @WritesAttribute(attribute="syslog.protocol", description="The protocol over which the Syslog message was received."), @WritesAttribute(attribute="mime.type", description="The mime.type of the FlowFile which will be text/plain for Syslog messages.")}) public class ListenSyslog extends AbstractSyslogProcessor { @@ -97,7 +99,7 @@ public class ListenSyslog extends AbstractSyslogProcessor { "incoming Syslog messages. When UDP is selected each buffer will hold one Syslog message. When TCP is selected messages are read " + "from an incoming connection until the buffer is full, or the connection is closed. ") .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) - .defaultValue("65507 KB") + .defaultValue("65507 B") .required(true) .build(); public static final PropertyDescriptor MAX_SOCKET_BUFFER_SIZE = new PropertyDescriptor.Builder() @@ -110,7 +112,7 @@ public class ListenSyslog extends AbstractSyslogProcessor { .required(true) .build(); public static final PropertyDescriptor MAX_CONNECTIONS = new PropertyDescriptor.Builder() - .name("Max number of TCP connections") + .name("Max Number of TCP Connections") .description("The maximum number of concurrent connections to accept syslog messages in TCP mode") .addValidator(StandardValidators.createLongValidator(1, 65535, true)) .defaultValue("2") @@ -251,6 +253,7 @@ public class ListenSyslog extends AbstractSyslogProcessor { attributes.put(SyslogAttributes.SENDER.key(), event.getSender()); attributes.put(SyslogAttributes.BODY.key(), event.getMsgBody()); attributes.put(SyslogAttributes.VALID.key(), String.valueOf(event.isValid())); + attributes.put(SyslogAttributes.PROTOCOL.key(), context.getProperty(PROTOCOL).getValue()); attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain"); FlowFile flowFile = session.create(); @@ -494,8 +497,11 @@ public class ListenSyslog extends AbstractSyslogProcessor { public int getPort() { // Return the port for the key listening for accepts for(SelectionKey key : selector.keys()){ - if (key.isValid() && key.isAcceptable()) { - return ((SocketChannel)key.channel()).socket().getLocalPort(); + if (key.isValid()) { + final Channel channel = key.channel(); + if (channel instanceof ServerSocketChannel) { + return ((ServerSocketChannel)channel).socket().getLocalPort(); + } } } return 0; @@ -619,7 +625,7 @@ public class ListenSyslog extends AbstractSyslogProcessor { eof = true; } catch (IOException e) { logger.error("Error reading from channel", e); - // Treat same as closed socket + // Treat same as closed socket eof = true; } finally { if(eof == true) { http://git-wip-us.apache.org/repos/asf/nifi/blob/6daaad67/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java index 7796868..345e425 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java @@ -143,6 +143,7 @@ public class TestListenSyslog { final ListenSyslog proc = new ListenSyslog(); final TestRunner runner = TestRunners.newTestRunner(proc); runner.setProperty(ListenSyslog.PROTOCOL, ListenSyslog.TCP_VALUE.getValue()); + runner.setProperty(ListenSyslog.MAX_CONNECTIONS, "5"); runner.setProperty(ListenSyslog.PORT, "0"); // schedule to start listening on a random port @@ -210,6 +211,8 @@ public class TestListenSyslog { proc.onTrigger(context, processSessionFactory); numTransfered = runner.getFlowFilesForRelationship(ListenSyslog.REL_INVALID).size(); } + + // all messages should be transferred to invalid Assert.assertEquals("Did not process all the messages", numMessages, numTransfered); } finally {
