Repository: nifi
Updated Branches:
  refs/heads/NIFI-274 e486f4619 -> 4bdd729dc


NIFI-274 Fixing TestListenSyslog, fixing default buffer size to be bytes, 
adding syslog.protocol to attributes


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/6daaad67
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/6daaad67
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/6daaad67

Branch: refs/heads/NIFI-274
Commit: 6daaad67fc326433083803ace6dceb481de55f8a
Parents: e486f46
Author: Bryan Bende <[email protected]>
Authored: Wed Nov 4 09:01:52 2015 -0500
Committer: Bryan Bende <[email protected]>
Committed: Wed Nov 4 09:01:52 2015 -0500

----------------------------------------------------------------------
 .../standard/AbstractSyslogProcessor.java           |  3 ++-
 .../nifi/processors/standard/ListenSyslog.java      | 16 +++++++++++-----
 .../nifi/processors/standard/TestListenSyslog.java  |  3 +++
 3 files changed, 16 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/6daaad67/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java
index f7d5eeb..ac5586d 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java
@@ -64,7 +64,8 @@ public abstract class AbstractSyslogProcessor extends 
AbstractProcessor {
         HOSTNAME("syslog.hostname"),
         SENDER("syslog.sender"),
         BODY("syslog.body"),
-        VALID("syslog.valid");
+        VALID("syslog.valid"),
+        PROTOCOL("syslog.protocol");
 
         private String key;
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/6daaad67/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
index eafe694..ff7be0d 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
@@ -22,6 +22,7 @@ import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.net.StandardSocketOptions;
 import java.nio.ByteBuffer;
+import java.nio.channels.Channel;
 import java.nio.channels.ClosedByInterruptException;
 import java.nio.channels.DatagramChannel;
 import java.nio.channels.SelectionKey;
@@ -88,6 +89,7 @@ import org.apache.nifi.stream.io.ByteArrayOutputStream;
                     @WritesAttribute(attribute="syslog.body", description="The 
body of the Syslog message, everything after the hostname."),
                     @WritesAttribute(attribute="syslog.valid", description="An 
indicator of whether this message matched the expected formats. " +
                             "If this value is false, the other attributes will 
be empty and only the original message will be available in the content."),
+                    @WritesAttribute(attribute="syslog.protocol", 
description="The protocol over which the Syslog message was received."),
                     @WritesAttribute(attribute="mime.type", description="The 
mime.type of the FlowFile which will be text/plain for Syslog messages.")})
 public class ListenSyslog extends AbstractSyslogProcessor {
 
@@ -97,7 +99,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
                     "incoming Syslog messages. When UDP is selected each 
buffer will hold one Syslog message. When TCP is selected messages are read " +
                     "from an incoming connection until the buffer is full, or 
the connection is closed. ")
             .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
-            .defaultValue("65507 KB")
+            .defaultValue("65507 B")
             .required(true)
             .build();
     public static final PropertyDescriptor MAX_SOCKET_BUFFER_SIZE = new 
PropertyDescriptor.Builder()
@@ -110,7 +112,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
             .required(true)
             .build();
     public static final PropertyDescriptor MAX_CONNECTIONS = new 
PropertyDescriptor.Builder()
-            .name("Max number of TCP connections")
+            .name("Max Number of TCP Connections")
             .description("The maximum number of concurrent connections to 
accept syslog messages in TCP mode")
             .addValidator(StandardValidators.createLongValidator(1, 65535, 
true))
             .defaultValue("2")
@@ -251,6 +253,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
         attributes.put(SyslogAttributes.SENDER.key(), event.getSender());
         attributes.put(SyslogAttributes.BODY.key(), event.getMsgBody());
         attributes.put(SyslogAttributes.VALID.key(), 
String.valueOf(event.isValid()));
+        attributes.put(SyslogAttributes.PROTOCOL.key(), 
context.getProperty(PROTOCOL).getValue());
         attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
 
         FlowFile flowFile = session.create();
@@ -494,8 +497,11 @@ public class ListenSyslog extends AbstractSyslogProcessor {
         public int getPort() {
             // Return the port for the key listening for accepts
             for(SelectionKey key : selector.keys()){
-                if (key.isValid() && key.isAcceptable()) {
-                    return 
((SocketChannel)key.channel()).socket().getLocalPort();
+                if (key.isValid()) {
+                    final Channel channel = key.channel();
+                    if (channel instanceof  ServerSocketChannel) {
+                        return 
((ServerSocketChannel)channel).socket().getLocalPort();
+                    }
                 }
             }
             return 0;
@@ -619,7 +625,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
                 eof = true;
             } catch (IOException e) {
                 logger.error("Error reading from channel", e);
-             // Treat same as closed socket
+                // Treat same as closed socket
                 eof = true;
             } finally {
                 if(eof == true) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/6daaad67/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
index 7796868..345e425 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
@@ -143,6 +143,7 @@ public class TestListenSyslog {
         final ListenSyslog proc = new ListenSyslog();
         final TestRunner runner = TestRunners.newTestRunner(proc);
         runner.setProperty(ListenSyslog.PROTOCOL, 
ListenSyslog.TCP_VALUE.getValue());
+        runner.setProperty(ListenSyslog.MAX_CONNECTIONS, "5");
         runner.setProperty(ListenSyslog.PORT, "0");
 
         // schedule to start listening on a random port
@@ -210,6 +211,8 @@ public class TestListenSyslog {
                 proc.onTrigger(context, processSessionFactory);
                 numTransfered = 
runner.getFlowFilesForRelationship(ListenSyslog.REL_INVALID).size();
             }
+
+            // all messages should be transferred to invalid
             Assert.assertEquals("Did not process all the messages", 
numMessages, numTransfered);
 
         } finally {

Reply via email to